You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/11 08:26:48 UTC

[skywalking] branch feature/envoy-tcp-log created (now 20fa717)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a change to branch feature/envoy-tcp-log
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 20fa717  feature: Envoy access log receiver supports TCP logs

This branch includes the following new commits:

     new 20fa717  feature: Envoy access log receiver supports TCP logs

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking] 01/01: feature: Envoy access log receiver supports TCP logs

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/envoy-tcp-log
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 20fa717d26b93cd947f828ea00a865e3488dec0e
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sun Apr 11 14:53:05 2021 +0800

    feature: Envoy access log receiver supports TCP logs
---
 apm-protocol/apm-network/src/main/proto            |   2 +-
 .../src/main/resources/application.yml             |   1 +
 .../src/main/resources/component-libraries.yml     |   3 +
 .../oap/server/core/source/RequestType.java        |   3 +-
 .../envoy/AccessLogServiceGRPCHandler.java         |  33 +++-
 .../receiver/envoy/EnvoyMetricReceiverConfig.java  |   8 +
 .../server/receiver/envoy/als/ALSHTTPAnalysis.java |  31 +---
 .../receiver/envoy/als/AbstractALSAnalyzer.java    |  20 ---
 ...ALSHTTPAnalysis.java => AccessLogAnalyzer.java} |  32 ++--
 .../envoy/als/LogEntry2MetricsAdapter.java         |  10 +-
 .../receiver/envoy/als/k8s/K8SServiceRegistry.java |   4 +-
 .../AbstractTCPAccessLogAnalyzer.java}             |  32 +---
 .../envoy/als/tcp/TCPAccessLogAnalyzer.java}       |  18 +-
 .../envoy/als/tcp/TCPLogEntry2MetricsAdapter.java  | 112 +++++++++++++
 .../als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java  | 181 +++++++++++++++++++++
 .../tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java   | 144 ++++++++++++++++
 ...ver.receiver.envoy.als.tcp.TCPAccessLogAnalyzer |  20 +++
 .../receiver/mesh/TelemetryDataDispatcher.java     |   5 +
 18 files changed, 548 insertions(+), 111 deletions(-)

diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto
index ce9e4e8..9a689b0 160000
--- a/apm-protocol/apm-network/src/main/proto
+++ b/apm-protocol/apm-network/src/main/proto
@@ -1 +1 @@
-Subproject commit ce9e4e8bd9e552443cc970df67ee25f17ff0d3b8
+Subproject commit 9a689b0188cbdc7bd8d6ddd99b4ad5283e82fe88
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 29c6754..3d50506 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -325,6 +325,7 @@ envoy-metric:
   default:
     acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true}
     alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""}
+    alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""}
     # `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata,
     # the available variables are `pod`, `service`, f.e., you can use `${service.metadata.name}-${pod.metadata.labels.version}`
     # to append the version number to the service name.
diff --git a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
index 61fc318..646dcc4 100755
--- a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
+++ b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
@@ -350,6 +350,9 @@ Apache-CXF:
 dolphinscheduler:
   id: 106
   languages: Java
+tcp:
+  id: 107
+  languages: Java
 
 # .NET/.NET Core components
 # [3000, 4000) for C#/.NET only
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
index 3298da0..6eea865 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
@@ -29,5 +29,6 @@ public enum RequestType {
     /**
      * Logic request only.
      */
-    LOGIC
+    LOGIC,
+    TCP
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index bce63fb..f3bc5ef 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.receiver.envoy;
 
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
 import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
 import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
 import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
@@ -32,6 +33,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
 import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
@@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory;
 public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
     private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
     private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
+    private final List<TCPAccessLogAnalyzer> envoyTCPAnalysisList;
 
     private final CounterMetrics counter;
     private final HistogramMetrics histogram;
@@ -51,6 +54,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
     public AccessLogServiceGRPCHandler(ModuleManager manager,
                                        EnvoyMetricReceiverConfig config) throws ModuleStartException {
         ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
+        ServiceLoader<TCPAccessLogAnalyzer> alsTcpAnalyzers = ServiceLoader.load(TCPAccessLogAnalyzer.class);
         envoyHTTPAnalysisList = new ArrayList<>();
         for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
             for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
@@ -60,8 +64,17 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
                 }
             }
         }
+        envoyTCPAnalysisList = new ArrayList<>();
+        for (String analyzerName : config.getAlsTCPAnalysis()) {
+            for (TCPAccessLogAnalyzer tcpAnalyzer : alsTcpAnalyzers) {
+                if (analyzerName.equals(tcpAnalyzer.name())) {
+                    tcpAnalyzer.init(manager, config);
+                    envoyTCPAnalysisList.add(tcpAnalyzer);
+                }
+            }
+        }
 
-        LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
+        LOGGER.debug("envoy HTTP analysis: {}, envoy TCP analysis: {}", envoyHTTPAnalysisList, envoyTCPAnalysisList);
 
         MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
         counter = metricCreator.createCounter(
@@ -110,11 +123,11 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
                                 .getId(), role, logCase, message);
                     }
 
+                    List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
                     switch (logCase) {
                         case HTTP_LOGS:
                             StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
 
-                            List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
                             for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
                                 List<ServiceMeshMetric.Builder> result = new ArrayList<>();
                                 for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
@@ -123,10 +136,22 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
                                 sourceResult.addAll(result);
                             }
 
-                            sourceDispatcherCounter.inc(sourceResult.size());
-                            sourceResult.forEach(TelemetryDataDispatcher::process);
+                            break;
+                        case TCP_LOGS:
+                            StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
+
+                            for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
+                                List<ServiceMeshMetric.Builder> result = new ArrayList<>();
+                                for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
+                                    result = analyzer.analysis(result, identifier, tcpLog, role);
+                                }
+                                sourceResult.addAll(result);
+                            }
+
                             break;
                     }
+                    sourceDispatcherCounter.inc(sourceResult.size());
+                    sourceResult.forEach(TelemetryDataDispatcher::process);
                 } finally {
                     timer.finish();
                 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
index 94f8c45..e10d95a 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
@@ -33,6 +33,7 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
     @Getter
     private boolean acceptMetricsService = false;
     private String alsHTTPAnalysis;
+    private String alsTCPAnalysis;
     @Getter
     private String k8sServiceNameRule;
 
@@ -45,6 +46,13 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
         return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
     }
 
+    public List<String> getAlsTCPAnalysis() {
+        if (Strings.isNullOrEmpty(alsTCPAnalysis)) {
+            return Collections.emptyList();
+        }
+        return Arrays.stream(alsTCPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
+    }
+
     public List<Rule> rules() throws ModuleStartException {
         return Rules.loadRules("envoy-metrics-rules", Collections.singletonList("envoy"));
     }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
index 7058e30..ce69eaa 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
@@ -19,38 +19,9 @@
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
-import java.util.List;
-import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 
 /**
  * Analysis source metrics from ALS
  */
-public interface ALSHTTPAnalysis {
-    String name();
-
-    void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
-
-    /**
-     * The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one.
-     *
-     * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
-     *
-     * @param result of the previous analyzer.
-     * @param identifier of the Envoy node where the logs are emitted.
-     * @param entry the log entry.
-     * @param role the role of the Envoy node where the logs are emitted.
-     * @return the analysis results.
-     */
-    List<ServiceMeshMetric.Builder> analysis(
-        final List<ServiceMeshMetric.Builder> result,
-        final StreamAccessLogsMessage.Identifier identifier,
-        final HTTPAccessLogEntry entry,
-        final Role role
-    );
-
-    Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
+public interface ALSHTTPAnalysis extends AccessLogAnalyzer<HTTPAccessLogEntry> {
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
index 0ccc147..2bb71af 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
@@ -18,33 +18,13 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
-import io.envoyproxy.envoy.config.core.v3.Node;
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
 
 @Slf4j
 public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
 
-    @Override
-    public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
-        if (alsIdentifier == null) {
-            return defaultRole;
-        }
-        if (!alsIdentifier.hasNode()) {
-            return defaultRole;
-        }
-        final Node node = alsIdentifier.getNode();
-        final String id = node.getId();
-        if (id.startsWith("router~")) {
-            return Role.PROXY;
-        } else if (id.startsWith("sidecar~")) {
-            return Role.SIDECAR;
-        }
-        return defaultRole;
-    }
-
     /**
      * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
      *
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
similarity index 70%
copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
index 7058e30..4dcf883 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
-import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.config.core.v3.Node;
 import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
 import java.util.List;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
@@ -26,10 +26,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 
-/**
- * Analysis source metrics from ALS
- */
-public interface ALSHTTPAnalysis {
+public interface AccessLogAnalyzer<E> {
     String name();
 
     void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
@@ -39,18 +36,33 @@ public interface ALSHTTPAnalysis {
      *
      * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
      *
-     * @param result of the previous analyzer.
+     * @param result     of the previous analyzer.
      * @param identifier of the Envoy node where the logs are emitted.
-     * @param entry the log entry.
-     * @param role the role of the Envoy node where the logs are emitted.
+     * @param entry      the log entry.
+     * @param role       the role of the Envoy node where the logs are emitted.
      * @return the analysis results.
      */
     List<ServiceMeshMetric.Builder> analysis(
         final List<ServiceMeshMetric.Builder> result,
         final StreamAccessLogsMessage.Identifier identifier,
-        final HTTPAccessLogEntry entry,
+        final E entry,
         final Role role
     );
 
-    Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
+    default Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role defaultRole) {
+        if (alsIdentifier == null) {
+            return defaultRole;
+        }
+        if (!alsIdentifier.hasNode()) {
+            return defaultRole;
+        }
+        final Node node = alsIdentifier.getNode();
+        final String id = node.getId();
+        if (id.startsWith("router~")) {
+            return Role.PROXY;
+        } else if (id.startsWith("sidecar~")) {
+            return Role.SIDECAR;
+        }
+        return defaultRole;
+    }
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
index accdf23..0fa1c3a 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -139,15 +139,15 @@ public class LogEntry2MetricsAdapter {
         return method + ":" + request.getPath();
     }
 
-    protected static long formatAsLong(final Timestamp timestamp) {
+    public static long formatAsLong(final Timestamp timestamp) {
         return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
     }
 
-    protected static long formatAsLong(final Duration duration) {
+    public static long formatAsLong(final Duration duration) {
         return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
     }
 
-    protected static Protocol requestProtocol(final HTTPRequestProperties request) {
+    public static Protocol requestProtocol(final HTTPRequestProperties request) {
         if (request == null) {
             return Protocol.HTTP;
         }
@@ -158,7 +158,7 @@ public class LogEntry2MetricsAdapter {
         return Protocol.gRPC;
     }
 
-    protected static String parseTLS(final TLSProperties properties) {
+    public static String parseTLS(final TLSProperties properties) {
         if (properties == null) {
             return NON_TLS;
         }
@@ -183,7 +183,7 @@ public class LogEntry2MetricsAdapter {
      * @param responseFlags in the ALS v2
      * @return empty string if no internal error code, or literal string representing the code.
      */
-    protected static String parseInternalErrorCode(final ResponseFlags responseFlags) {
+    public static String parseInternalErrorCode(final ResponseFlags responseFlags) {
         if (responseFlags != null) {
             if (responseFlags.getFailedLocalHealthcheck()) {
                 return "failed_local_healthcheck";
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
index 158cd20..aa601ce 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
@@ -264,7 +264,7 @@ public class K8SServiceRegistry {
                      .collect(Collectors.toList());
     }
 
-    protected ServiceMetaInfo findService(final String ip) {
+    public ServiceMetaInfo findService(final String ip) {
         final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
         if (isNull(service)) {
             log.debug("Unknown ip {}, ip -> service is null", ip);
@@ -311,7 +311,7 @@ public class K8SServiceRegistry {
         });
     }
 
-    protected boolean isEmpty() {
+    public boolean isEmpty() {
         return ipServiceMetaInfoMap.isEmpty();
     }
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
similarity index 60%
copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
index 0ccc147..0516509 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
@@ -16,34 +16,16 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.envoy.als;
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
 
-import io.envoyproxy.envoy.config.core.v3.Node;
 import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
 
 @Slf4j
-public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
-
-    @Override
-    public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
-        if (alsIdentifier == null) {
-            return defaultRole;
-        }
-        if (!alsIdentifier.hasNode()) {
-            return defaultRole;
-        }
-        final Node node = alsIdentifier.getNode();
-        final String id = node.getId();
-        if (id.startsWith("router~")) {
-            return Role.PROXY;
-        } else if (id.startsWith("sidecar~")) {
-            return Role.SIDECAR;
-        }
-        return defaultRole;
-    }
+public abstract class AbstractTCPAccessLogAnalyzer implements TCPAccessLogAnalyzer {
 
     /**
      * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
@@ -53,11 +35,11 @@ public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
      * @param targetService the target/destination service.
      * @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
      */
-    protected LogEntry2MetricsAdapter newAdapter(
-        final HTTPAccessLogEntry entry,
+    protected TCPLogEntry2MetricsAdapter newAdapter(
+        final TCPAccessLogEntry entry,
         final ServiceMetaInfo sourceService,
         final ServiceMetaInfo targetService) {
-        return new LogEntry2MetricsAdapter(entry, sourceService, targetService);
+        return new TCPLogEntry2MetricsAdapter(entry, sourceService, targetService);
     }
 
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
similarity index 73%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
index 3298da0..c073b82 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
@@ -16,18 +16,10 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.source;
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
 
-/**
- * RPC request type.
- */
-public enum RequestType {
-    DATABASE,
-    HTTP,
-    RPC,
-    gRPC,
-    /**
-     * Logic request only.
-     */
-    LOGIC
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
+
+public interface TCPAccessLogAnalyzer extends AccessLogAnalyzer<TCPAccessLogEntry> {
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
new file mode 100644
index 0000000..8d1572a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
+
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.apm.network.common.v3.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.formatAsLong;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseInternalErrorCode;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseTLS;
+
+/**
+ * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders.
+ */
+@RequiredArgsConstructor
+public class TCPLogEntry2MetricsAdapter {
+
+    /**
+     * The access log entry that is to be adapted into metrics builders.
+     */
+    protected final TCPAccessLogEntry entry;
+
+    protected final ServiceMetaInfo sourceService;
+
+    protected final ServiceMetaInfo targetService;
+
+    /**
+     * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}.
+     *
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+     */
+    public ServiceMeshMetric.Builder adaptToDownstreamMetrics() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = formatAsLong(properties.getStartTime());
+        final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+        return adaptCommonPart()
+            .setStartTime(startTime)
+            .setEndTime(startTime + duration)
+            .setLatency((int) Math.max(1L, duration))
+            .setDetectPoint(DetectPoint.server);
+    }
+
+    /**
+     * Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}.
+     *
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+     */
+    public ServiceMeshMetric.Builder adaptToUpstreamMetrics() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = formatAsLong(properties.getStartTime());
+        final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte());
+        final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte());
+
+        return adaptCommonPart()
+            .setStartTime(outboundStartTime)
+            .setEndTime(outboundEndTime)
+            .setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime))
+            .setDetectPoint(DetectPoint.client);
+    }
+
+    protected ServiceMeshMetric.Builder adaptCommonPart() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final String tlsMode = parseTLS(properties.getTlsProperties());
+        final String internalErrorCode = parseInternalErrorCode(properties.getResponseFlags());
+
+        final ServiceMeshMetric.Builder builder =
+            ServiceMeshMetric.newBuilder()
+                             .setTlsMode(tlsMode)
+                             .setProtocol(Protocol.TCP)
+                             .setInternalErrorCode(internalErrorCode);
+
+        Optional.ofNullable(sourceService)
+                .map(ServiceMetaInfo::getServiceName)
+                .ifPresent(builder::setSourceServiceName);
+        Optional.ofNullable(sourceService)
+                .map(ServiceMetaInfo::getServiceInstanceName)
+                .ifPresent(builder::setSourceServiceInstance);
+        Optional.ofNullable(targetService)
+                .map(ServiceMetaInfo::getServiceName)
+                .ifPresent(builder::setDestServiceName);
+        Optional.ofNullable(targetService)
+                .map(ServiceMetaInfo::getServiceInstanceName)
+                .ifPresent(builder::setDestServiceInstance);
+
+        return builder;
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
new file mode 100644
index 0000000..97ea3f1
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s;
+
+import io.envoyproxy.envoy.config.core.v3.Address;
+import io.envoyproxy.envoy.config.core.v3.SocketAddress;
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SServiceRegistry;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
+
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+
+/**
+ * Analysis log based on ingress and mesh scenarios.
+ */
+@Slf4j
+public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
+    protected K8SServiceRegistry serviceRegistry;
+
+    @Override
+    public String name() {
+        return "tcp-k8s-mesh";
+    }
+
+    @Override
+    @SneakyThrows
+    public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) {
+        serviceRegistry = new K8SServiceRegistry(config);
+        serviceRegistry.start();
+    }
+
+    @Override
+    public List<ServiceMeshMetric.Builder> analysis(
+        final List<ServiceMeshMetric.Builder> result,
+        final StreamAccessLogsMessage.Identifier identifier,
+        final TCPAccessLogEntry entry,
+        final Role role
+    ) {
+        if (isNotEmpty(result)) {
+            return result;
+        }
+        if (serviceRegistry.isEmpty()) {
+            return Collections.emptyList();
+        }
+        switch (role) {
+            case PROXY:
+                return analyzeProxy(entry);
+            case SIDECAR:
+                return analyzeSideCar(entry);
+        }
+
+        return Collections.emptyList();
+    }
+
+    protected List<ServiceMeshMetric.Builder> analyzeSideCar(final TCPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final String cluster = properties.getUpstreamCluster();
+        if (cluster == null) {
+            return Collections.emptyList();
+        }
+
+        final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
+
+        final Address downstreamRemoteAddress =
+            properties.hasDownstreamDirectRemoteAddress()
+                ? properties.getDownstreamDirectRemoteAddress()
+                : properties.getDownstreamRemoteAddress();
+        final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
+        final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+        final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
+
+        if (cluster.startsWith("inbound|")) {
+            // Server side
+            final ServiceMeshMetric.Builder metrics;
+            if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+                // Ingress -> sidecar(server side)
+                // Mesh telemetry without source, the relation would be generated.
+                metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics();
+
+                log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics);
+            } else {
+                // sidecar -> sidecar(server side)
+                metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics();
+
+                log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics);
+            }
+            sources.add(metrics);
+        } else if (cluster.startsWith("outbound|")) {
+            // sidecar(client side) -> sidecar
+            final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+            final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
+
+            final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics();
+
+            log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+            sources.add(metric);
+        }
+
+        return sources;
+    }
+
+    protected List<ServiceMeshMetric.Builder> analyzeProxy(final TCPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+        final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ?
+            properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
+        final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+        if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
+            return Collections.emptyList();
+        }
+
+        final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
+        final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
+        final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress());
+
+        final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
+        final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress());
+
+        final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics();
+
+        log.debug("Transformed ingress inbound mesh metric {}", metric);
+        result.add(metric);
+
+        final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
+        final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress());
+
+        final ServiceMeshMetric.Builder outboundMetric =
+            newAdapter(entry, ingress, targetService)
+                .adaptToUpstreamMetrics()
+                // Can't parse it from tls properties, leave it to Server side.
+                .setTlsMode(NON_TLS);
+
+        log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
+        result.add(outboundMetric);
+
+        return result;
+    }
+
+    /**
+     * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
+     */
+    protected ServiceMetaInfo find(String ip) {
+        return serviceRegistry.findService(ip);
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
new file mode 100644
index 0000000..f317123
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.TextFormat;
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.apache.skywalking.oap.server.receiver.envoy.als.mx.FieldsHelper;
+import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
+
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.DOWNSTREAM_KEY;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.UPSTREAM_KEY;
+
+@Slf4j
+public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyzer {
+    protected String fieldMappingFile = "metadata-service-mapping.yaml";
+
+    protected EnvoyMetricReceiverConfig config;
+
+    @Override
+    public String name() {
+        return "tcp-mx-mesh";
+    }
+
+    @Override
+    public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
+        this.config = config;
+        try {
+            FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz());
+        } catch (final Exception e) {
+            throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
+        }
+    }
+
+    @Override
+    public List<ServiceMeshMetric.Builder> analysis(
+        final List<ServiceMeshMetric.Builder> result,
+        final StreamAccessLogsMessage.Identifier identifier,
+        final TCPAccessLogEntry entry,
+        final Role role
+    ) {
+        if (isNotEmpty(result)) {
+            return result;
+        }
+        if (!entry.hasCommonProperties()) {
+            return Collections.emptyList();
+        }
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
+        if (stateMap.isEmpty()) {
+            return Collections.emptyList();
+        }
+        final ServiceMetaInfo currSvc;
+        try {
+            currSvc = adaptToServiceMetaInfo(identifier);
+        } catch (Exception e) {
+            log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
+            return Collections.emptyList();
+        }
+
+        final AtomicBoolean downstreamExists = new AtomicBoolean();
+        stateMap.forEach((key, value) -> {
+            if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
+                return;
+            }
+            final ServiceMetaInfo svc;
+            try {
+                svc = adaptToServiceMetaInfo(value);
+            } catch (Exception e) {
+                log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray()));
+                return;
+            }
+            final ServiceMeshMetric.Builder metrics;
+            switch (key) {
+                case UPSTREAM_KEY:
+                    metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+                    }
+                    result.add(metrics);
+                    break;
+                case DOWNSTREAM_KEY:
+                    metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+                    }
+                    result.add(metrics);
+                    downstreamExists.set(true);
+                    break;
+            }
+        });
+        if (role.equals(Role.PROXY) && !downstreamExists.get()) {
+            final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics();
+            if (log.isDebugEnabled()) {
+                log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric));
+            }
+            result.add(metric);
+        }
+        return result;
+    }
+
+    protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
+        return new ServiceMetaInfoAdapter(value);
+    }
+
+    protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception {
+        return config.serviceMetaInfoFactory().fromStruct(identifier.getNode().getMetadata());
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
new file mode 100644
index 0000000..5290deb
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis
+org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index c1da14b..d347110 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -234,6 +234,8 @@ public class TelemetryDataDispatcher {
                 return RequestType.gRPC;
             case HTTP:
                 return RequestType.HTTP;
+            case TCP:
+                return RequestType.TCP;
             case UNRECOGNIZED:
             default:
                 return RequestType.RPC;
@@ -248,6 +250,9 @@ public class TelemetryDataDispatcher {
             case HTTP:
                 // HTTP in component-libraries.yml
                 return 49;
+            case TCP:
+                // TCP in component-libraries.yml
+                return 107;
             case UNRECOGNIZED:
             default:
                 // RPC in component-libraries.yml