You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/11 08:26:49 UTC

[skywalking] 01/01: feature: Envoy access log receiver supports TCP logs

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/envoy-tcp-log
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 20fa717d26b93cd947f828ea00a865e3488dec0e
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sun Apr 11 14:53:05 2021 +0800

    feature: Envoy access log receiver supports TCP logs
---
 apm-protocol/apm-network/src/main/proto            |   2 +-
 .../src/main/resources/application.yml             |   1 +
 .../src/main/resources/component-libraries.yml     |   3 +
 .../oap/server/core/source/RequestType.java        |   3 +-
 .../envoy/AccessLogServiceGRPCHandler.java         |  33 +++-
 .../receiver/envoy/EnvoyMetricReceiverConfig.java  |   8 +
 .../server/receiver/envoy/als/ALSHTTPAnalysis.java |  31 +---
 .../receiver/envoy/als/AbstractALSAnalyzer.java    |  20 ---
 ...ALSHTTPAnalysis.java => AccessLogAnalyzer.java} |  32 ++--
 .../envoy/als/LogEntry2MetricsAdapter.java         |  10 +-
 .../receiver/envoy/als/k8s/K8SServiceRegistry.java |   4 +-
 .../AbstractTCPAccessLogAnalyzer.java}             |  32 +---
 .../envoy/als/tcp/TCPAccessLogAnalyzer.java}       |  18 +-
 .../envoy/als/tcp/TCPLogEntry2MetricsAdapter.java  | 112 +++++++++++++
 .../als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java  | 181 +++++++++++++++++++++
 .../tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java   | 144 ++++++++++++++++
 ...ver.receiver.envoy.als.tcp.TCPAccessLogAnalyzer |  20 +++
 .../receiver/mesh/TelemetryDataDispatcher.java     |   5 +
 18 files changed, 548 insertions(+), 111 deletions(-)

diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto
index ce9e4e8..9a689b0 160000
--- a/apm-protocol/apm-network/src/main/proto
+++ b/apm-protocol/apm-network/src/main/proto
@@ -1 +1 @@
-Subproject commit ce9e4e8bd9e552443cc970df67ee25f17ff0d3b8
+Subproject commit 9a689b0188cbdc7bd8d6ddd99b4ad5283e82fe88
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 29c6754..3d50506 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -325,6 +325,7 @@ envoy-metric:
   default:
     acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true}
     alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""}
+    alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""}
     # `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata,
     # the available variables are `pod`, `service`, f.e., you can use `${service.metadata.name}-${pod.metadata.labels.version}`
     # to append the version number to the service name.
diff --git a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
index 61fc318..646dcc4 100755
--- a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
+++ b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
@@ -350,6 +350,9 @@ Apache-CXF:
 dolphinscheduler:
   id: 106
   languages: Java
+tcp:
+  id: 107
+  languages: Java
 
 # .NET/.NET Core components
 # [3000, 4000) for C#/.NET only
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
index 3298da0..6eea865 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
@@ -29,5 +29,6 @@ public enum RequestType {
     /**
      * Logic request only.
      */
-    LOGIC
+    LOGIC,
+    TCP
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index bce63fb..f3bc5ef 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.receiver.envoy;
 
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
 import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
 import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
 import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
@@ -32,6 +33,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
 import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
@@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory;
 public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
     private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
     private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
+    private final List<TCPAccessLogAnalyzer> envoyTCPAnalysisList;
 
     private final CounterMetrics counter;
     private final HistogramMetrics histogram;
@@ -51,6 +54,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
     public AccessLogServiceGRPCHandler(ModuleManager manager,
                                        EnvoyMetricReceiverConfig config) throws ModuleStartException {
         ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
+        ServiceLoader<TCPAccessLogAnalyzer> alsTcpAnalyzers = ServiceLoader.load(TCPAccessLogAnalyzer.class);
         envoyHTTPAnalysisList = new ArrayList<>();
         for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
             for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
@@ -60,8 +64,17 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
                 }
             }
         }
+        envoyTCPAnalysisList = new ArrayList<>();
+        for (String analyzerName : config.getAlsTCPAnalysis()) {
+            for (TCPAccessLogAnalyzer tcpAnalyzer : alsTcpAnalyzers) {
+                if (analyzerName.equals(tcpAnalyzer.name())) {
+                    tcpAnalyzer.init(manager, config);
+                    envoyTCPAnalysisList.add(tcpAnalyzer);
+                }
+            }
+        }
 
-        LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
+        LOGGER.debug("envoy HTTP analysis: {}, envoy TCP analysis: {}", envoyHTTPAnalysisList, envoyTCPAnalysisList);
 
         MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
         counter = metricCreator.createCounter(
@@ -110,11 +123,11 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
                                 .getId(), role, logCase, message);
                     }
 
+                    List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
                     switch (logCase) {
                         case HTTP_LOGS:
                             StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
 
-                            List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
                             for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
                                 List<ServiceMeshMetric.Builder> result = new ArrayList<>();
                                 for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
@@ -123,10 +136,22 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
                                 sourceResult.addAll(result);
                             }
 
-                            sourceDispatcherCounter.inc(sourceResult.size());
-                            sourceResult.forEach(TelemetryDataDispatcher::process);
+                            break;
+                        case TCP_LOGS:
+                            StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
+
+                            for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
+                                List<ServiceMeshMetric.Builder> result = new ArrayList<>();
+                                for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
+                                    result = analyzer.analysis(result, identifier, tcpLog, role);
+                                }
+                                sourceResult.addAll(result);
+                            }
+
                             break;
                     }
+                    sourceDispatcherCounter.inc(sourceResult.size());
+                    sourceResult.forEach(TelemetryDataDispatcher::process);
                 } finally {
                     timer.finish();
                 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
index 94f8c45..e10d95a 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
@@ -33,6 +33,7 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
     @Getter
     private boolean acceptMetricsService = false;
     private String alsHTTPAnalysis;
+    private String alsTCPAnalysis;
     @Getter
     private String k8sServiceNameRule;
 
@@ -45,6 +46,13 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
         return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
     }
 
+    public List<String> getAlsTCPAnalysis() {
+        if (Strings.isNullOrEmpty(alsTCPAnalysis)) {
+            return Collections.emptyList();
+        }
+        return Arrays.stream(alsTCPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
+    }
+
     public List<Rule> rules() throws ModuleStartException {
         return Rules.loadRules("envoy-metrics-rules", Collections.singletonList("envoy"));
     }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
index 7058e30..ce69eaa 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
@@ -19,38 +19,9 @@
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
-import java.util.List;
-import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 
 /**
  * Analysis source metrics from ALS
  */
-public interface ALSHTTPAnalysis {
-    String name();
-
-    void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
-
-    /**
-     * The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one.
-     *
-     * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
-     *
-     * @param result of the previous analyzer.
-     * @param identifier of the Envoy node where the logs are emitted.
-     * @param entry the log entry.
-     * @param role the role of the Envoy node where the logs are emitted.
-     * @return the analysis results.
-     */
-    List<ServiceMeshMetric.Builder> analysis(
-        final List<ServiceMeshMetric.Builder> result,
-        final StreamAccessLogsMessage.Identifier identifier,
-        final HTTPAccessLogEntry entry,
-        final Role role
-    );
-
-    Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
+public interface ALSHTTPAnalysis extends AccessLogAnalyzer<HTTPAccessLogEntry> {
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
index 0ccc147..2bb71af 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
@@ -18,33 +18,13 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
-import io.envoyproxy.envoy.config.core.v3.Node;
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
 
 @Slf4j
 public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
 
-    @Override
-    public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
-        if (alsIdentifier == null) {
-            return defaultRole;
-        }
-        if (!alsIdentifier.hasNode()) {
-            return defaultRole;
-        }
-        final Node node = alsIdentifier.getNode();
-        final String id = node.getId();
-        if (id.startsWith("router~")) {
-            return Role.PROXY;
-        } else if (id.startsWith("sidecar~")) {
-            return Role.SIDECAR;
-        }
-        return defaultRole;
-    }
-
     /**
      * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
      *
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
similarity index 70%
copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
index 7058e30..4dcf883 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
-import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.config.core.v3.Node;
 import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
 import java.util.List;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
@@ -26,10 +26,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 
-/**
- * Analysis source metrics from ALS
- */
-public interface ALSHTTPAnalysis {
+public interface AccessLogAnalyzer<E> {
     String name();
 
     void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
@@ -39,18 +36,33 @@ public interface ALSHTTPAnalysis {
      *
      * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
      *
-     * @param result of the previous analyzer.
+     * @param result     of the previous analyzer.
      * @param identifier of the Envoy node where the logs are emitted.
-     * @param entry the log entry.
-     * @param role the role of the Envoy node where the logs are emitted.
+     * @param entry      the log entry.
+     * @param role       the role of the Envoy node where the logs are emitted.
      * @return the analysis results.
      */
     List<ServiceMeshMetric.Builder> analysis(
         final List<ServiceMeshMetric.Builder> result,
         final StreamAccessLogsMessage.Identifier identifier,
-        final HTTPAccessLogEntry entry,
+        final E entry,
         final Role role
     );
 
-    Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
+    default Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role defaultRole) {
+        if (alsIdentifier == null) {
+            return defaultRole;
+        }
+        if (!alsIdentifier.hasNode()) {
+            return defaultRole;
+        }
+        final Node node = alsIdentifier.getNode();
+        final String id = node.getId();
+        if (id.startsWith("router~")) {
+            return Role.PROXY;
+        } else if (id.startsWith("sidecar~")) {
+            return Role.SIDECAR;
+        }
+        return defaultRole;
+    }
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
index accdf23..0fa1c3a 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -139,15 +139,15 @@ public class LogEntry2MetricsAdapter {
         return method + ":" + request.getPath();
     }
 
-    protected static long formatAsLong(final Timestamp timestamp) {
+    public static long formatAsLong(final Timestamp timestamp) {
         return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
     }
 
-    protected static long formatAsLong(final Duration duration) {
+    public static long formatAsLong(final Duration duration) {
         return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
     }
 
-    protected static Protocol requestProtocol(final HTTPRequestProperties request) {
+    public static Protocol requestProtocol(final HTTPRequestProperties request) {
         if (request == null) {
             return Protocol.HTTP;
         }
@@ -158,7 +158,7 @@ public class LogEntry2MetricsAdapter {
         return Protocol.gRPC;
     }
 
-    protected static String parseTLS(final TLSProperties properties) {
+    public static String parseTLS(final TLSProperties properties) {
         if (properties == null) {
             return NON_TLS;
         }
@@ -183,7 +183,7 @@ public class LogEntry2MetricsAdapter {
      * @param responseFlags in the ALS v2
      * @return empty string if no internal error code, or literal string representing the code.
      */
-    protected static String parseInternalErrorCode(final ResponseFlags responseFlags) {
+    public static String parseInternalErrorCode(final ResponseFlags responseFlags) {
         if (responseFlags != null) {
             if (responseFlags.getFailedLocalHealthcheck()) {
                 return "failed_local_healthcheck";
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
index 158cd20..aa601ce 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
@@ -264,7 +264,7 @@ public class K8SServiceRegistry {
                      .collect(Collectors.toList());
     }
 
-    protected ServiceMetaInfo findService(final String ip) {
+    public ServiceMetaInfo findService(final String ip) {
         final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
         if (isNull(service)) {
             log.debug("Unknown ip {}, ip -> service is null", ip);
@@ -311,7 +311,7 @@ public class K8SServiceRegistry {
         });
     }
 
-    protected boolean isEmpty() {
+    public boolean isEmpty() {
         return ipServiceMetaInfoMap.isEmpty();
     }
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
similarity index 60%
copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
index 0ccc147..0516509 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
@@ -16,34 +16,16 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.envoy.als;
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
 
-import io.envoyproxy.envoy.config.core.v3.Node;
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
 
 @Slf4j
-public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
-
-    @Override
-    public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
-        if (alsIdentifier == null) {
-            return defaultRole;
-        }
-        if (!alsIdentifier.hasNode()) {
-            return defaultRole;
-        }
-        final Node node = alsIdentifier.getNode();
-        final String id = node.getId();
-        if (id.startsWith("router~")) {
-            return Role.PROXY;
-        } else if (id.startsWith("sidecar~")) {
-            return Role.SIDECAR;
-        }
-        return defaultRole;
-    }
+public abstract class AbstractTCPAccessLogAnalyzer implements TCPAccessLogAnalyzer {
 
     /**
      * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
@@ -53,11 +35,11 @@ public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
      * @param targetService the target/destination service.
      * @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
      */
-    protected LogEntry2MetricsAdapter newAdapter(
-        final HTTPAccessLogEntry entry,
+    protected TCPLogEntry2MetricsAdapter newAdapter(
+        final TCPAccessLogEntry entry,
         final ServiceMetaInfo sourceService,
         final ServiceMetaInfo targetService) {
-        return new LogEntry2MetricsAdapter(entry, sourceService, targetService);
+        return new TCPLogEntry2MetricsAdapter(entry, sourceService, targetService);
     }
 
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
similarity index 73%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
index 3298da0..c073b82 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
@@ -16,18 +16,10 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.source;
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
 
-/**
- * RPC request type.
- */
-public enum RequestType {
-    DATABASE,
-    HTTP,
-    RPC,
-    gRPC,
-    /**
-     * Logic request only.
-     */
-    LOGIC
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
+
+public interface TCPAccessLogAnalyzer extends AccessLogAnalyzer<TCPAccessLogEntry> {
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
new file mode 100644
index 0000000..8d1572a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
+
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.apm.network.common.v3.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.formatAsLong;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseInternalErrorCode;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseTLS;
+
+/**
+ * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders.
+ */
+@RequiredArgsConstructor
+public class TCPLogEntry2MetricsAdapter {
+
+    /**
+     * The access log entry that is to be adapted into metrics builders.
+     */
+    protected final TCPAccessLogEntry entry;
+
+    protected final ServiceMetaInfo sourceService;
+
+    protected final ServiceMetaInfo targetService;
+
+    /**
+     * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}.
+     *
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+     */
+    public ServiceMeshMetric.Builder adaptToDownstreamMetrics() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = formatAsLong(properties.getStartTime());
+        final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+        return adaptCommonPart()
+            .setStartTime(startTime)
+            .setEndTime(startTime + duration)
+            .setLatency((int) Math.max(1L, duration))
+            .setDetectPoint(DetectPoint.server);
+    }
+
+    /**
+     * Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}.
+     *
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+     */
+    public ServiceMeshMetric.Builder adaptToUpstreamMetrics() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = formatAsLong(properties.getStartTime());
+        final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte());
+        final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte());
+
+        return adaptCommonPart()
+            .setStartTime(outboundStartTime)
+            .setEndTime(outboundEndTime)
+            .setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime))
+            .setDetectPoint(DetectPoint.client);
+    }
+
+    protected ServiceMeshMetric.Builder adaptCommonPart() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final String tlsMode = parseTLS(properties.getTlsProperties());
+        final String internalErrorCode = parseInternalErrorCode(properties.getResponseFlags());
+
+        final ServiceMeshMetric.Builder builder =
+            ServiceMeshMetric.newBuilder()
+                             .setTlsMode(tlsMode)
+                             .setProtocol(Protocol.TCP)
+                             .setInternalErrorCode(internalErrorCode);
+
+        Optional.ofNullable(sourceService)
+                .map(ServiceMetaInfo::getServiceName)
+                .ifPresent(builder::setSourceServiceName);
+        Optional.ofNullable(sourceService)
+                .map(ServiceMetaInfo::getServiceInstanceName)
+                .ifPresent(builder::setSourceServiceInstance);
+        Optional.ofNullable(targetService)
+                .map(ServiceMetaInfo::getServiceName)
+                .ifPresent(builder::setDestServiceName);
+        Optional.ofNullable(targetService)
+                .map(ServiceMetaInfo::getServiceInstanceName)
+                .ifPresent(builder::setDestServiceInstance);
+
+        return builder;
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
new file mode 100644
index 0000000..97ea3f1
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s;
+
+import io.envoyproxy.envoy.config.core.v3.Address;
+import io.envoyproxy.envoy.config.core.v3.SocketAddress;
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SServiceRegistry;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
+
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+
+/**
+ * Analysis log based on ingress and mesh scenarios.
+ */
+@Slf4j
+public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
+    protected K8SServiceRegistry serviceRegistry;
+
+    @Override
+    public String name() {
+        return "tcp-k8s-mesh";
+    }
+
+    @Override
+    @SneakyThrows
+    public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) {
+        serviceRegistry = new K8SServiceRegistry(config);
+        serviceRegistry.start();
+    }
+
+    @Override
+    public List<ServiceMeshMetric.Builder> analysis(
+        final List<ServiceMeshMetric.Builder> result,
+        final StreamAccessLogsMessage.Identifier identifier,
+        final TCPAccessLogEntry entry,
+        final Role role
+    ) {
+        if (isNotEmpty(result)) {
+            return result;
+        }
+        if (serviceRegistry.isEmpty()) {
+            return Collections.emptyList();
+        }
+        switch (role) {
+            case PROXY:
+                return analyzeProxy(entry);
+            case SIDECAR:
+                return analyzeSideCar(entry);
+        }
+
+        return Collections.emptyList();
+    }
+
+    protected List<ServiceMeshMetric.Builder> analyzeSideCar(final TCPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final String cluster = properties.getUpstreamCluster();
+        if (cluster == null) {
+            return Collections.emptyList();
+        }
+
+        final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
+
+        final Address downstreamRemoteAddress =
+            properties.hasDownstreamDirectRemoteAddress()
+                ? properties.getDownstreamDirectRemoteAddress()
+                : properties.getDownstreamRemoteAddress();
+        final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
+        final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+        final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
+
+        if (cluster.startsWith("inbound|")) {
+            // Server side
+            final ServiceMeshMetric.Builder metrics;
+            if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+                // Ingress -> sidecar(server side)
+                // Mesh telemetry without source, the relation would be generated.
+                metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics();
+
+                log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics);
+            } else {
+                // sidecar -> sidecar(server side)
+                metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics();
+
+                log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics);
+            }
+            sources.add(metrics);
+        } else if (cluster.startsWith("outbound|")) {
+            // sidecar(client side) -> sidecar
+            final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+            final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
+
+            final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics();
+
+            log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+            sources.add(metric);
+        }
+
+        return sources;
+    }
+
+    protected List<ServiceMeshMetric.Builder> analyzeProxy(final TCPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+        final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ?
+            properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
+        final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+        if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
+            return Collections.emptyList();
+        }
+
+        final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
+        final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
+        final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress());
+
+        final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
+        final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress());
+
+        final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics();
+
+        log.debug("Transformed ingress inbound mesh metric {}", metric);
+        result.add(metric);
+
+        final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
+        final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress());
+
+        final ServiceMeshMetric.Builder outboundMetric =
+            newAdapter(entry, ingress, targetService)
+                .adaptToUpstreamMetrics()
+                // Can't parse it from tls properties, leave it to Server side.
+                .setTlsMode(NON_TLS);
+
+        log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
+        result.add(outboundMetric);
+
+        return result;
+    }
+
+    /**
+     * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
+     */
+    protected ServiceMetaInfo find(String ip) {
+        return serviceRegistry.findService(ip);
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
new file mode 100644
index 0000000..f317123
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.TextFormat;
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.apache.skywalking.oap.server.receiver.envoy.als.mx.FieldsHelper;
+import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
+
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.DOWNSTREAM_KEY;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.UPSTREAM_KEY;
+
+@Slf4j
+public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyzer {
+    protected String fieldMappingFile = "metadata-service-mapping.yaml";
+
+    protected EnvoyMetricReceiverConfig config;
+
+    @Override
+    public String name() {
+        return "tcp-mx-mesh";
+    }
+
+    @Override
+    public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
+        this.config = config;
+        try {
+            FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz());
+        } catch (final Exception e) {
+            throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
+        }
+    }
+
+    @Override
+    public List<ServiceMeshMetric.Builder> analysis(
+        final List<ServiceMeshMetric.Builder> result,
+        final StreamAccessLogsMessage.Identifier identifier,
+        final TCPAccessLogEntry entry,
+        final Role role
+    ) {
+        if (isNotEmpty(result)) {
+            return result;
+        }
+        if (!entry.hasCommonProperties()) {
+            return Collections.emptyList();
+        }
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
+        if (stateMap.isEmpty()) {
+            return Collections.emptyList();
+        }
+        final ServiceMetaInfo currSvc;
+        try {
+            currSvc = adaptToServiceMetaInfo(identifier);
+        } catch (Exception e) {
+            log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
+            return Collections.emptyList();
+        }
+
+        final AtomicBoolean downstreamExists = new AtomicBoolean();
+        stateMap.forEach((key, value) -> {
+            if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
+                return;
+            }
+            final ServiceMetaInfo svc;
+            try {
+                svc = adaptToServiceMetaInfo(value);
+            } catch (Exception e) {
+                log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray()));
+                return;
+            }
+            final ServiceMeshMetric.Builder metrics;
+            switch (key) {
+                case UPSTREAM_KEY:
+                    metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+                    }
+                    result.add(metrics);
+                    break;
+                case DOWNSTREAM_KEY:
+                    metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+                    }
+                    result.add(metrics);
+                    downstreamExists.set(true);
+                    break;
+            }
+        });
+        if (role.equals(Role.PROXY) && !downstreamExists.get()) {
+            final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics();
+            if (log.isDebugEnabled()) {
+                log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric));
+            }
+            result.add(metric);
+        }
+        return result;
+    }
+
+    protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
+        return new ServiceMetaInfoAdapter(value);
+    }
+
+    protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception {
+        return config.serviceMetaInfoFactory().fromStruct(identifier.getNode().getMetadata());
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
new file mode 100644
index 0000000..5290deb
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis
+org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index c1da14b..d347110 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -234,6 +234,8 @@ public class TelemetryDataDispatcher {
                 return RequestType.gRPC;
             case HTTP:
                 return RequestType.HTTP;
+            case TCP:
+                return RequestType.TCP;
             case UNRECOGNIZED:
             default:
                 return RequestType.RPC;
@@ -248,6 +250,9 @@ public class TelemetryDataDispatcher {
             case HTTP:
                 // HTTP in component-libraries.yml
                 return 49;
+            case TCP:
+                // TCP in component-libraries.yml
+                return 107;
             case UNRECOGNIZED:
             default:
                 // RPC in component-libraries.yml