You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/11 08:26:49 UTC
[skywalking] 01/01: feature: Envoy access log receiver supports TCP
logs
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch feature/envoy-tcp-log
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 20fa717d26b93cd947f828ea00a865e3488dec0e
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sun Apr 11 14:53:05 2021 +0800
feature: Envoy access log receiver supports TCP logs
---
apm-protocol/apm-network/src/main/proto | 2 +-
.../src/main/resources/application.yml | 1 +
.../src/main/resources/component-libraries.yml | 3 +
.../oap/server/core/source/RequestType.java | 3 +-
.../envoy/AccessLogServiceGRPCHandler.java | 33 +++-
.../receiver/envoy/EnvoyMetricReceiverConfig.java | 8 +
.../server/receiver/envoy/als/ALSHTTPAnalysis.java | 31 +---
.../receiver/envoy/als/AbstractALSAnalyzer.java | 20 ---
...ALSHTTPAnalysis.java => AccessLogAnalyzer.java} | 32 ++--
.../envoy/als/LogEntry2MetricsAdapter.java | 10 +-
.../receiver/envoy/als/k8s/K8SServiceRegistry.java | 4 +-
.../AbstractTCPAccessLogAnalyzer.java} | 32 +---
.../envoy/als/tcp/TCPAccessLogAnalyzer.java} | 18 +-
.../envoy/als/tcp/TCPLogEntry2MetricsAdapter.java | 112 +++++++++++++
.../als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java | 181 +++++++++++++++++++++
.../tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java | 144 ++++++++++++++++
...ver.receiver.envoy.als.tcp.TCPAccessLogAnalyzer | 20 +++
.../receiver/mesh/TelemetryDataDispatcher.java | 5 +
18 files changed, 548 insertions(+), 111 deletions(-)
diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto
index ce9e4e8..9a689b0 160000
--- a/apm-protocol/apm-network/src/main/proto
+++ b/apm-protocol/apm-network/src/main/proto
@@ -1 +1 @@
-Subproject commit ce9e4e8bd9e552443cc970df67ee25f17ff0d3b8
+Subproject commit 9a689b0188cbdc7bd8d6ddd99b4ad5283e82fe88
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 29c6754..3d50506 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -325,6 +325,7 @@ envoy-metric:
default:
acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true}
alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""}
+ alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""}
# `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata,
# the available variables are `pod`, `service`, f.e., you can use `${service.metadata.name}-${pod.metadata.labels.version}`
# to append the version number to the service name.
diff --git a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
index 61fc318..646dcc4 100755
--- a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
+++ b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml
@@ -350,6 +350,9 @@ Apache-CXF:
dolphinscheduler:
id: 106
languages: Java
+tcp:
+ id: 107
+ languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
index 3298da0..6eea865 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
@@ -29,5 +29,6 @@ public enum RequestType {
/**
* Logic request only.
*/
- LOGIC
+ LOGIC,
+ TCP
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index bce63fb..f3bc5ef 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.envoy;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
@@ -32,6 +33,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
@@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory;
public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
+ private final List<TCPAccessLogAnalyzer> envoyTCPAnalysisList;
private final CounterMetrics counter;
private final HistogramMetrics histogram;
@@ -51,6 +54,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
public AccessLogServiceGRPCHandler(ModuleManager manager,
EnvoyMetricReceiverConfig config) throws ModuleStartException {
ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
+ ServiceLoader<TCPAccessLogAnalyzer> alsTcpAnalyzers = ServiceLoader.load(TCPAccessLogAnalyzer.class);
envoyHTTPAnalysisList = new ArrayList<>();
for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
@@ -60,8 +64,17 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
}
}
}
+ envoyTCPAnalysisList = new ArrayList<>();
+ for (String analyzerName : config.getAlsTCPAnalysis()) {
+ for (TCPAccessLogAnalyzer tcpAnalyzer : alsTcpAnalyzers) {
+ if (analyzerName.equals(tcpAnalyzer.name())) {
+ tcpAnalyzer.init(manager, config);
+ envoyTCPAnalysisList.add(tcpAnalyzer);
+ }
+ }
+ }
- LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
+ LOGGER.debug("envoy HTTP analysis: {}, envoy TCP analysis: {}", envoyHTTPAnalysisList, envoyTCPAnalysisList);
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
counter = metricCreator.createCounter(
@@ -110,11 +123,11 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
.getId(), role, logCase, message);
}
+ List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
switch (logCase) {
case HTTP_LOGS:
StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
- List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
List<ServiceMeshMetric.Builder> result = new ArrayList<>();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
@@ -123,10 +136,22 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
sourceResult.addAll(result);
}
- sourceDispatcherCounter.inc(sourceResult.size());
- sourceResult.forEach(TelemetryDataDispatcher::process);
+ break;
+ case TCP_LOGS:
+ StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
+
+ for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
+ List<ServiceMeshMetric.Builder> result = new ArrayList<>();
+ for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
+ result = analyzer.analysis(result, identifier, tcpLog, role);
+ }
+ sourceResult.addAll(result);
+ }
+
break;
}
+ sourceDispatcherCounter.inc(sourceResult.size());
+ sourceResult.forEach(TelemetryDataDispatcher::process);
} finally {
timer.finish();
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
index 94f8c45..e10d95a 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
@@ -33,6 +33,7 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
@Getter
private boolean acceptMetricsService = false;
private String alsHTTPAnalysis;
+ private String alsTCPAnalysis;
@Getter
private String k8sServiceNameRule;
@@ -45,6 +46,13 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
}
+ public List<String> getAlsTCPAnalysis() {
+ if (Strings.isNullOrEmpty(alsTCPAnalysis)) {
+ return Collections.emptyList();
+ }
+ return Arrays.stream(alsTCPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
+ }
+
public List<Rule> rules() throws ModuleStartException {
return Rules.loadRules("envoy-metrics-rules", Collections.singletonList("envoy"));
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
index 7058e30..ce69eaa 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
@@ -19,38 +19,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
-import java.util.List;
-import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
/**
* Analysis source metrics from ALS
*/
-public interface ALSHTTPAnalysis {
- String name();
-
- void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
-
- /**
- * The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one.
- *
- * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
- *
- * @param result of the previous analyzer.
- * @param identifier of the Envoy node where the logs are emitted.
- * @param entry the log entry.
- * @param role the role of the Envoy node where the logs are emitted.
- * @return the analysis results.
- */
- List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
- final StreamAccessLogsMessage.Identifier identifier,
- final HTTPAccessLogEntry entry,
- final Role role
- );
-
- Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
+public interface ALSHTTPAnalysis extends AccessLogAnalyzer<HTTPAccessLogEntry> {
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
index 0ccc147..2bb71af 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
@@ -18,33 +18,13 @@
package org.apache.skywalking.oap.server.receiver.envoy.als;
-import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
@Slf4j
public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
- @Override
- public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
- if (alsIdentifier == null) {
- return defaultRole;
- }
- if (!alsIdentifier.hasNode()) {
- return defaultRole;
- }
- final Node node = alsIdentifier.getNode();
- final String id = node.getId();
- if (id.startsWith("router~")) {
- return Role.PROXY;
- } else if (id.startsWith("sidecar~")) {
- return Role.SIDECAR;
- }
- return defaultRole;
- }
-
/**
* Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
*
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
similarity index 70%
copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
index 7058e30..4dcf883 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.receiver.envoy.als;
-import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.List;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
@@ -26,10 +26,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
-/**
- * Analysis source metrics from ALS
- */
-public interface ALSHTTPAnalysis {
+public interface AccessLogAnalyzer<E> {
String name();
void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
@@ -39,18 +36,33 @@ public interface ALSHTTPAnalysis {
*
* To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
*
- * @param result of the previous analyzer.
+ * @param result of the previous analyzer.
* @param identifier of the Envoy node where the logs are emitted.
- * @param entry the log entry.
- * @param role the role of the Envoy node where the logs are emitted.
+ * @param entry the log entry.
+ * @param role the role of the Envoy node where the logs are emitted.
* @return the analysis results.
*/
List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
- final HTTPAccessLogEntry entry,
+ final E entry,
final Role role
);
- Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
+ default Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role defaultRole) {
+ if (alsIdentifier == null) {
+ return defaultRole;
+ }
+ if (!alsIdentifier.hasNode()) {
+ return defaultRole;
+ }
+ final Node node = alsIdentifier.getNode();
+ final String id = node.getId();
+ if (id.startsWith("router~")) {
+ return Role.PROXY;
+ } else if (id.startsWith("sidecar~")) {
+ return Role.SIDECAR;
+ }
+ return defaultRole;
+ }
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
index accdf23..0fa1c3a 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -139,15 +139,15 @@ public class LogEntry2MetricsAdapter {
return method + ":" + request.getPath();
}
- protected static long formatAsLong(final Timestamp timestamp) {
+ public static long formatAsLong(final Timestamp timestamp) {
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
}
- protected static long formatAsLong(final Duration duration) {
+ public static long formatAsLong(final Duration duration) {
return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
}
- protected static Protocol requestProtocol(final HTTPRequestProperties request) {
+ public static Protocol requestProtocol(final HTTPRequestProperties request) {
if (request == null) {
return Protocol.HTTP;
}
@@ -158,7 +158,7 @@ public class LogEntry2MetricsAdapter {
return Protocol.gRPC;
}
- protected static String parseTLS(final TLSProperties properties) {
+ public static String parseTLS(final TLSProperties properties) {
if (properties == null) {
return NON_TLS;
}
@@ -183,7 +183,7 @@ public class LogEntry2MetricsAdapter {
* @param responseFlags in the ALS v2
* @return empty string if no internal error code, or literal string representing the code.
*/
- protected static String parseInternalErrorCode(final ResponseFlags responseFlags) {
+ public static String parseInternalErrorCode(final ResponseFlags responseFlags) {
if (responseFlags != null) {
if (responseFlags.getFailedLocalHealthcheck()) {
return "failed_local_healthcheck";
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
index 158cd20..aa601ce 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
@@ -264,7 +264,7 @@ public class K8SServiceRegistry {
.collect(Collectors.toList());
}
- protected ServiceMetaInfo findService(final String ip) {
+ public ServiceMetaInfo findService(final String ip) {
final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
if (isNull(service)) {
log.debug("Unknown ip {}, ip -> service is null", ip);
@@ -311,7 +311,7 @@ public class K8SServiceRegistry {
});
}
- protected boolean isEmpty() {
+ public boolean isEmpty() {
return ipServiceMetaInfoMap.isEmpty();
}
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
similarity index 60%
copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
index 0ccc147..0516509 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java
@@ -16,34 +16,16 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.envoy.als;
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
-import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
@Slf4j
-public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
-
- @Override
- public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
- if (alsIdentifier == null) {
- return defaultRole;
- }
- if (!alsIdentifier.hasNode()) {
- return defaultRole;
- }
- final Node node = alsIdentifier.getNode();
- final String id = node.getId();
- if (id.startsWith("router~")) {
- return Role.PROXY;
- } else if (id.startsWith("sidecar~")) {
- return Role.SIDECAR;
- }
- return defaultRole;
- }
+public abstract class AbstractTCPAccessLogAnalyzer implements TCPAccessLogAnalyzer {
/**
* Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
@@ -53,11 +35,11 @@ public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
* @param targetService the target/destination service.
* @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
*/
- protected LogEntry2MetricsAdapter newAdapter(
- final HTTPAccessLogEntry entry,
+ protected TCPLogEntry2MetricsAdapter newAdapter(
+ final TCPAccessLogEntry entry,
final ServiceMetaInfo sourceService,
final ServiceMetaInfo targetService) {
- return new LogEntry2MetricsAdapter(entry, sourceService, targetService);
+ return new TCPLogEntry2MetricsAdapter(entry, sourceService, targetService);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
similarity index 73%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
index 3298da0..c073b82 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java
@@ -16,18 +16,10 @@
*
*/
-package org.apache.skywalking.oap.server.core.source;
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
-/**
- * RPC request type.
- */
-public enum RequestType {
- DATABASE,
- HTTP,
- RPC,
- gRPC,
- /**
- * Logic request only.
- */
- LOGIC
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
+
+public interface TCPAccessLogAnalyzer extends AccessLogAnalyzer<TCPAccessLogEntry> {
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
new file mode 100644
index 0000000..8d1572a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
+
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.apm.network.common.v3.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.formatAsLong;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseInternalErrorCode;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseTLS;
+
+/**
+ * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders.
+ */
+@RequiredArgsConstructor
+public class TCPLogEntry2MetricsAdapter {
+
+ /**
+ * The access log entry that is to be adapted into metrics builders.
+ */
+ protected final TCPAccessLogEntry entry;
+
+ protected final ServiceMetaInfo sourceService;
+
+ protected final ServiceMetaInfo targetService;
+
+ /**
+ * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}.
+ *
+ * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+ */
+ public ServiceMeshMetric.Builder adaptToDownstreamMetrics() {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final long startTime = formatAsLong(properties.getStartTime());
+ final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+ return adaptCommonPart()
+ .setStartTime(startTime)
+ .setEndTime(startTime + duration)
+ .setLatency((int) Math.max(1L, duration))
+ .setDetectPoint(DetectPoint.server);
+ }
+
+ /**
+ * Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}.
+ *
+ * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+ */
+ public ServiceMeshMetric.Builder adaptToUpstreamMetrics() {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final long startTime = formatAsLong(properties.getStartTime());
+ final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte());
+ final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte());
+
+ return adaptCommonPart()
+ .setStartTime(outboundStartTime)
+ .setEndTime(outboundEndTime)
+ .setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime))
+ .setDetectPoint(DetectPoint.client);
+ }
+
+ protected ServiceMeshMetric.Builder adaptCommonPart() {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final String tlsMode = parseTLS(properties.getTlsProperties());
+ final String internalErrorCode = parseInternalErrorCode(properties.getResponseFlags());
+
+ final ServiceMeshMetric.Builder builder =
+ ServiceMeshMetric.newBuilder()
+ .setTlsMode(tlsMode)
+ .setProtocol(Protocol.TCP)
+ .setInternalErrorCode(internalErrorCode);
+
+ Optional.ofNullable(sourceService)
+ .map(ServiceMetaInfo::getServiceName)
+ .ifPresent(builder::setSourceServiceName);
+ Optional.ofNullable(sourceService)
+ .map(ServiceMetaInfo::getServiceInstanceName)
+ .ifPresent(builder::setSourceServiceInstance);
+ Optional.ofNullable(targetService)
+ .map(ServiceMetaInfo::getServiceName)
+ .ifPresent(builder::setDestServiceName);
+ Optional.ofNullable(targetService)
+ .map(ServiceMetaInfo::getServiceInstanceName)
+ .ifPresent(builder::setDestServiceInstance);
+
+ return builder;
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
new file mode 100644
index 0000000..97ea3f1
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s;
+
+import io.envoyproxy.envoy.config.core.v3.Address;
+import io.envoyproxy.envoy.config.core.v3.SocketAddress;
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SServiceRegistry;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
+
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+
+/**
+ * Analysis log based on ingress and mesh scenarios.
+ */
+@Slf4j
+public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
+ protected K8SServiceRegistry serviceRegistry;
+
+ @Override
+ public String name() {
+ return "tcp-k8s-mesh";
+ }
+
+ @Override
+ @SneakyThrows
+ public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) {
+ serviceRegistry = new K8SServiceRegistry(config);
+ serviceRegistry.start();
+ }
+
+ @Override
+ public List<ServiceMeshMetric.Builder> analysis(
+ final List<ServiceMeshMetric.Builder> result,
+ final StreamAccessLogsMessage.Identifier identifier,
+ final TCPAccessLogEntry entry,
+ final Role role
+ ) {
+ if (isNotEmpty(result)) {
+ return result;
+ }
+ if (serviceRegistry.isEmpty()) {
+ return Collections.emptyList();
+ }
+ switch (role) {
+ case PROXY:
+ return analyzeProxy(entry);
+ case SIDECAR:
+ return analyzeSideCar(entry);
+ }
+
+ return Collections.emptyList();
+ }
+
+ protected List<ServiceMeshMetric.Builder> analyzeSideCar(final TCPAccessLogEntry entry) {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ if (properties == null) {
+ return Collections.emptyList();
+ }
+ final String cluster = properties.getUpstreamCluster();
+ if (cluster == null) {
+ return Collections.emptyList();
+ }
+
+ final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
+
+ final Address downstreamRemoteAddress =
+ properties.hasDownstreamDirectRemoteAddress()
+ ? properties.getDownstreamDirectRemoteAddress()
+ : properties.getDownstreamRemoteAddress();
+ final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
+ final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+ final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
+
+ if (cluster.startsWith("inbound|")) {
+ // Server side
+ final ServiceMeshMetric.Builder metrics;
+ if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+ // Ingress -> sidecar(server side)
+ // Mesh telemetry without source, the relation would be generated.
+ metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics();
+
+ log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics);
+ } else {
+ // sidecar -> sidecar(server side)
+ metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics();
+
+ log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics);
+ }
+ sources.add(metrics);
+ } else if (cluster.startsWith("outbound|")) {
+ // sidecar(client side) -> sidecar
+ final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+ final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
+
+ final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics();
+
+ log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+ sources.add(metric);
+ }
+
+ return sources;
+ }
+
+ protected List<ServiceMeshMetric.Builder> analyzeProxy(final TCPAccessLogEntry entry) {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ if (properties == null) {
+ return Collections.emptyList();
+ }
+ final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+ final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ?
+ properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
+ final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+ if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
+ return Collections.emptyList();
+ }
+
+ final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
+ final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
+ final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress());
+
+ final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
+ final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress());
+
+ final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics();
+
+ log.debug("Transformed ingress inbound mesh metric {}", metric);
+ result.add(metric);
+
+ final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
+ final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress());
+
+ final ServiceMeshMetric.Builder outboundMetric =
+ newAdapter(entry, ingress, targetService)
+ .adaptToUpstreamMetrics()
+ // Can't parse it from tls properties, leave it to Server side.
+ .setTlsMode(NON_TLS);
+
+ log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
+ result.add(outboundMetric);
+
+ return result;
+ }
+
+ /**
+ * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
+ */
+ protected ServiceMetaInfo find(String ip) {
+ return serviceRegistry.findService(ip);
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
new file mode 100644
index 0000000..f317123
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.TextFormat;
+import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.apache.skywalking.oap.server.receiver.envoy.als.mx.FieldsHelper;
+import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
+
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.DOWNSTREAM_KEY;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.UPSTREAM_KEY;
+
+@Slf4j
+public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyzer {
+ protected String fieldMappingFile = "metadata-service-mapping.yaml";
+
+ protected EnvoyMetricReceiverConfig config;
+
+ @Override
+ public String name() {
+ return "tcp-mx-mesh";
+ }
+
+ @Override
+ public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
+ this.config = config;
+ try {
+ FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz());
+ } catch (final Exception e) {
+ throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
+ }
+ }
+
+ @Override
+ public List<ServiceMeshMetric.Builder> analysis(
+ final List<ServiceMeshMetric.Builder> result,
+ final StreamAccessLogsMessage.Identifier identifier,
+ final TCPAccessLogEntry entry,
+ final Role role
+ ) {
+ if (isNotEmpty(result)) {
+ return result;
+ }
+ if (!entry.hasCommonProperties()) {
+ return Collections.emptyList();
+ }
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
+ if (stateMap.isEmpty()) {
+ return Collections.emptyList();
+ }
+ final ServiceMetaInfo currSvc;
+ try {
+ currSvc = adaptToServiceMetaInfo(identifier);
+ } catch (Exception e) {
+ log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
+ return Collections.emptyList();
+ }
+
+ final AtomicBoolean downstreamExists = new AtomicBoolean();
+ stateMap.forEach((key, value) -> {
+ if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
+ return;
+ }
+ final ServiceMetaInfo svc;
+ try {
+ svc = adaptToServiceMetaInfo(value);
+ } catch (Exception e) {
+ log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray()));
+ return;
+ }
+ final ServiceMeshMetric.Builder metrics;
+ switch (key) {
+ case UPSTREAM_KEY:
+ metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS);
+ if (log.isDebugEnabled()) {
+ log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+ }
+ result.add(metrics);
+ break;
+ case DOWNSTREAM_KEY:
+ metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics();
+ if (log.isDebugEnabled()) {
+ log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+ }
+ result.add(metrics);
+ downstreamExists.set(true);
+ break;
+ }
+ });
+ if (role.equals(Role.PROXY) && !downstreamExists.get()) {
+ final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics();
+ if (log.isDebugEnabled()) {
+ log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric));
+ }
+ result.add(metric);
+ }
+ return result;
+ }
+
+ protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
+ return new ServiceMetaInfoAdapter(value);
+ }
+
+ protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception {
+ return config.serviceMetaInfoFactory().fromStruct(identifier.getNode().getMetadata());
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
new file mode 100644
index 0000000..5290deb
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis
+org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index c1da14b..d347110 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -234,6 +234,8 @@ public class TelemetryDataDispatcher {
return RequestType.gRPC;
case HTTP:
return RequestType.HTTP;
+ case TCP:
+ return RequestType.TCP;
case UNRECOGNIZED:
default:
return RequestType.RPC;
@@ -248,6 +250,9 @@ public class TelemetryDataDispatcher {
case HTTP:
// HTTP in component-libraries.yml
return 49;
+ case TCP:
+ // TCP in component-libraries.yml
+ return 107;
case UNRECOGNIZED:
default:
// RPC in component-libraries.yml