You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/05/07 13:54:54 UTC
[skywalking] 01/01: bugfix: Envoy error logs are not persisted when
no metrics are generated
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch bugfix/envoy-error-logs
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit f5df4d91904bf2b70b9273c2caedba98d45fe017
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri May 7 21:54:25 2021 +0800
bugfix: Envoy error logs are not persisted when no metrics are generated
---
CHANGES.md | 3 +-
.../src/main/resources/lal/envoy-als.yaml | 4 +-
.../envoy/AccessLogServiceGRPCHandler.java | 14 ++++--
.../receiver/envoy/als/AccessLogAnalyzer.java | 20 ++++++++-
.../envoy/als/LogEntry2MetricsAdapter.java | 2 +-
.../als/k8s/K8sALSServiceMeshHTTPAnalysis.java | 31 +++++++-------
.../envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java | 27 ++++++------
.../envoy/als/tcp/TCPLogEntry2MetricsAdapter.java | 2 +-
.../als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java | 33 +++++++-------
.../tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java | 26 +++++------
.../envoy/persistence/LogsPersistence.java | 40 ++++++-----------
...ogsPersistence.java => TCPLogsPersistence.java} | 50 ++++++++--------------
...ver.receiver.envoy.als.tcp.TCPAccessLogAnalyzer | 1 +
.../als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java | 29 ++++++-------
14 files changed, 140 insertions(+), 142 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index f80e5cd..cee0732 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -36,7 +36,8 @@ Release Notes.
* CVE: fix Jetty vulnerability. https://nvd.nist.gov/vuln/detail/CVE-2019-17638
* Fix: MAL function would miss samples name after creating new samples.
* perf: use iterator.remove() to remove modulesWithoutProvider
-* Support analyzing Envoy TCP access logs.
+* Support analyzing Envoy TCP access logs and persist error TCP logs.
+* Fix: Envoy error logs are not persisted when no metrics are generated
#### UI
* Add logo for kong plugin.
diff --git a/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml b/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml
index 4be3213..96a7c1a 100644
--- a/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml
+++ b/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml
@@ -24,7 +24,9 @@ rules:
abort {}
}
extractor {
- tag 'status.code': parsed?.response?.responseCode as int
+ if (parsed?.response?.responseCode) {
+ tag 'status.code': parsed?.response?.responseCode as int
+ }
tag 'response.flag': parsed?.commonProperties?.responseFlags?.keySet()
}
sink {
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index f3bc5ef..14c052e 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -31,7 +31,9 @@ import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -129,11 +131,13 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
- List<ServiceMeshMetric.Builder> result = new ArrayList<>();
+ AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
result = analysis.analysis(result, identifier, log, role);
}
- sourceResult.addAll(result);
+ if (CollectionUtils.isNotEmpty(result.getMetrics())) {
+ sourceResult.addAll(result.getMetrics());
+ }
}
break;
@@ -141,11 +145,13 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
- List<ServiceMeshMetric.Builder> result = new ArrayList<>();
+ AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
result = analyzer.analysis(result, identifier, tcpLog, role);
}
- sourceResult.addAll(result);
+ if (CollectionUtils.isNotEmpty(result.getMetrics())) {
+ sourceResult.addAll(result.getMetrics());
+ }
}
break;
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
index 4dcf883..36ea04c 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.List;
+import lombok.Builder;
+import lombok.Data;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -42,8 +44,8 @@ public interface AccessLogAnalyzer<E> {
* @param role the role of the Envoy node where the logs are emitted.
* @return the analysis results.
*/
- List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
final E entry,
final Role role
@@ -65,4 +67,18 @@ public interface AccessLogAnalyzer<E> {
}
return defaultRole;
}
+
+ @Data
+ @Builder
+ class Result {
+ /**
+ * The service representing the Envoy node.
+ */
+ private ServiceMetaInfo service;
+
+ /**
+ * The analyzed metrics result.
+ */
+ private List<ServiceMeshMetric.Builder> metrics;
+ }
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
index 7b6f8fb..63da419 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -91,7 +91,7 @@ public class LogEntry2MetricsAdapter {
.setDetectPoint(DetectPoint.client);
}
- protected ServiceMeshMetric.Builder adaptCommonPart() {
+ public ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties();
final String endpoint = endpoint();
int responseCode = entry.getResponse().getResponseCode().getValue();
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
index e761ff7..f521a3e 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -60,17 +59,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
+ if (isNotEmpty(result.getMetrics())) {
return result;
}
if (serviceRegistry.isEmpty()) {
- return Collections.emptyList();
+ return Result.builder().build();
}
switch (role) {
case PROXY:
@@ -79,17 +78,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
return analyzeSideCar(entry);
}
- return Collections.emptyList();
+ return Result.builder().build();
}
- protected List<ServiceMeshMetric.Builder> analyzeSideCar(final HTTPAccessLogEntry entry) {
+ protected Result analyzeSideCar(final HTTPAccessLogEntry entry) {
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final AccessLogCommon properties = entry.getCommonProperties();
final String cluster = properties.getUpstreamCluster();
if (isBlank(cluster)) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
@@ -101,7 +100,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
if (!isValid(downstreamRemoteAddress) || !isValid(downstreamLocalAddress)) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
@@ -125,7 +124,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
// sidecar(client side) -> sidecar
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (!isValid(upstreamRemoteAddress)) {
- return sources;
+ return Result.builder().metrics(sources).service(localService).build();
}
final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
@@ -135,12 +134,12 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
sources.add(metric);
}
- return sources;
+ return Result.builder().metrics(sources).service(localService).build();
}
- protected List<ServiceMeshMetric.Builder> analyzeProxy(final HTTPAccessLogEntry entry) {
+ protected Result analyzeProxy(final HTTPAccessLogEntry entry) {
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final AccessLogCommon properties = entry.getCommonProperties();
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
@@ -148,7 +147,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (!isValid(downstreamLocalAddress) || !isValid(downstreamRemoteAddress) || !isValid(upstreamRemoteAddress)) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
@@ -175,7 +174,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
result.add(outboundMetric);
- return result;
+ return Result.builder().metrics(result).service(ingress).build();
}
/**
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
index 0cb752e..81a3d10 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
@@ -23,8 +23,8 @@ import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,31 +68,32 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
- return result;
+ if (isNotEmpty(previousResult.getMetrics())) {
+ return previousResult;
}
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
- }
- final AccessLogCommon properties = entry.getCommonProperties();
- final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
- if (stateMap.isEmpty()) {
- return Collections.emptyList();
+ return previousResult;
}
final ServiceMetaInfo currSvc;
try {
currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
- return Collections.emptyList();
+ return previousResult;
+ }
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
+ if (stateMap.isEmpty()) {
+ return Result.builder().service(currSvc).build();
}
+ final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
@@ -131,7 +132,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
}
result.add(metric);
}
- return result;
+ return Result.builder().metrics(result).service(currSvc).build();
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
index e7d60d1..4a1882b 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
@@ -85,7 +85,7 @@ public class TCPLogEntry2MetricsAdapter {
.setDetectPoint(DetectPoint.client);
}
- protected ServiceMeshMetric.Builder adaptCommonPart() {
+ public ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties();
final ConnectionProperties connectionProperties = entry.getConnectionProperties();
final String tlsMode = parseTLS(properties.getTlsProperties());
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
index c3a8f36..fffbd83 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -59,36 +58,36 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
- return result;
+ if (isNotEmpty(previousResult.getMetrics())) {
+ return previousResult;
}
if (serviceRegistry.isEmpty()) {
- return Collections.emptyList();
+ return previousResult;
}
switch (role) {
case PROXY:
- return analyzeProxy(entry);
+ return analyzeProxy(previousResult, entry);
case SIDECAR:
- return analyzeSideCar(entry);
+ return analyzeSideCar(previousResult, entry);
}
- return Collections.emptyList();
+ return previousResult;
}
- protected List<ServiceMeshMetric.Builder> analyzeSideCar(final TCPAccessLogEntry entry) {
+ protected Result analyzeSideCar(final Result previousResult, final TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
- return Collections.emptyList();
+ return previousResult;
}
final String cluster = properties.getUpstreamCluster();
if (cluster == null) {
- return Collections.emptyList();
+ return previousResult;
}
final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
@@ -128,20 +127,20 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
sources.add(metric);
}
- return sources;
+ return Result.builder().metrics(sources).service(localService).build();
}
- protected List<ServiceMeshMetric.Builder> analyzeProxy(final TCPAccessLogEntry entry) {
+ protected Result analyzeProxy(final Result previousResult, final TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
- return Collections.emptyList();
+ return previousResult;
}
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ?
properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
- return Collections.emptyList();
+ return previousResult;
}
final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
@@ -168,7 +167,7 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
result.add(outboundMetric);
- return result;
+ return Result.builder().metrics(result).service(ingress).build();
}
/**
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
index 1f4cdcc..eeb9e3e 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
@@ -23,6 +23,7 @@ import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
@@ -67,31 +68,32 @@ public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyz
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
- return result;
+ if (isNotEmpty(previousResult.getMetrics())) {
+ return previousResult;
}
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
- }
- final AccessLogCommon properties = entry.getCommonProperties();
- final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
- if (stateMap.isEmpty()) {
- return Collections.emptyList();
+ return previousResult;
}
final ServiceMetaInfo currSvc;
try {
currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
- return Collections.emptyList();
+ return previousResult;
+ }
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
+ if (stateMap.isEmpty()) {
+ return Result.builder().service(currSvc).build();
}
+ final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
@@ -130,7 +132,7 @@ public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyz
}
result.add(metric);
}
- return result;
+ return Result.builder().metrics(result).service(currSvc).build();
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
index 132911b..4f1e266 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.persistence;
-import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
-import java.io.IOException;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
@@ -35,7 +31,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
+import org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
@@ -59,27 +57,15 @@ public class LogsPersistence implements ALSHTTPAnalysis {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
try {
- result.stream()
- .findFirst()
- .ifPresent(metrics -> {
- try {
- final LogData logData = convertToLogData(entry, metrics);
- logAnalyzerService.doAnalysis(logData);
- } catch (IOException e) {
- log.error(
- "Failed to parse error log entry to log data: {}",
- TextFormat.shortDebugString(entry),
- e
- );
- }
- });
+ final LogData logData = convertToLogData(entry, result);
+ logAnalyzerService.doAnalysis(logData);
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
@@ -92,16 +78,16 @@ public class LogsPersistence implements ALSHTTPAnalysis {
}
public LogData convertToLogData(final HTTPAccessLogEntry logEntry,
- final ServiceMeshMetric.Builder metrics) throws IOException {
- final boolean isServerSide = metrics.getDetectPoint() == DetectPoint.server;
- final String svc = isServerSide ? metrics.getDestServiceName() : metrics.getSourceServiceName();
- final String svcInst = isServerSide ? metrics.getDestServiceInstance() : metrics.getSourceServiceInstance();
+ final Result result) throws Exception {
+
+ final ServiceMetaInfo service = result.getService();
+
+ final ServiceMeshMetric.Builder metrics = new LogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData
.newBuilder()
- .setService(svc)
- .setServiceInstance(svcInst)
- .setEndpoint(metrics.getEndpoint())
+ .setService(service.getServiceName())
+ .setServiceInstance(service.getServiceInstanceName())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/TCPLogsPersistence.java
similarity index 64%
copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/TCPLogsPersistence.java
index 132911b..8d3d0b6 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/TCPLogsPersistence.java
@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.persistence;
-import com.google.protobuf.TextFormat;
-import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
-import java.io.IOException;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
@@ -34,8 +30,10 @@ import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
+import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPLogEntry2MetricsAdapter;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
@@ -43,7 +41,7 @@ import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.to
* {@code LogsPersistence} analyzes the error logs and persists them to the log system.
*/
@Slf4j
-public class LogsPersistence implements ALSHTTPAnalysis {
+public class TCPLogsPersistence implements TCPAccessLogAnalyzer {
private ILogAnalyzerService logAnalyzerService;
@Override
@@ -59,27 +57,15 @@ public class LogsPersistence implements ALSHTTPAnalysis {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
- final HTTPAccessLogEntry entry,
+ final TCPAccessLogEntry entry,
final Role role
) {
try {
- result.stream()
- .findFirst()
- .ifPresent(metrics -> {
- try {
- final LogData logData = convertToLogData(entry, metrics);
- logAnalyzerService.doAnalysis(logData);
- } catch (IOException e) {
- log.error(
- "Failed to parse error log entry to log data: {}",
- TextFormat.shortDebugString(entry),
- e
- );
- }
- });
+ final LogData logData = convertToLogData(entry, result);
+ logAnalyzerService.doAnalysis(logData);
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
@@ -91,17 +77,17 @@ public class LogsPersistence implements ALSHTTPAnalysis {
return prev;
}
- public LogData convertToLogData(final HTTPAccessLogEntry logEntry,
- final ServiceMeshMetric.Builder metrics) throws IOException {
- final boolean isServerSide = metrics.getDetectPoint() == DetectPoint.server;
- final String svc = isServerSide ? metrics.getDestServiceName() : metrics.getSourceServiceName();
- final String svcInst = isServerSide ? metrics.getDestServiceInstance() : metrics.getSourceServiceInstance();
+ public LogData convertToLogData(final TCPAccessLogEntry logEntry,
+ final Result result) throws Exception {
+
+ final ServiceMetaInfo service = result.getService();
+
+ final ServiceMeshMetric.Builder metrics = new TCPLogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData
.newBuilder()
- .setService(svc)
- .setServiceInstance(svcInst)
- .setEndpoint(metrics.getEndpoint())
+ .setService(service.getServiceName())
+ .setServiceInstance(service.getServiceInstanceName())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
index 5290deb..e207746 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
@@ -18,3 +18,4 @@
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer
+org.apache.skywalking.oap.server.receiver.envoy.persistence.TCPLogsPersistence
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
index 2ebf581..abe7073 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
@@ -23,13 +23,12 @@ import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.junit.Assert;
@@ -78,16 +77,16 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
+ AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
- Assert.assertEquals(2, result.size());
+ Assert.assertEquals(2, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName());
Assert.assertEquals("ingress", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
- ServiceMeshMetric.Builder outgoing = result.get(1);
+ ServiceMeshMetric.Builder outgoing = result.getMetrics().get(1);
Assert.assertEquals("ingress", outgoing.getSourceServiceName());
Assert.assertEquals("productpage", outgoing.getDestServiceName());
Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint());
@@ -100,11 +99,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+ AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
- Assert.assertEquals(1, result.size());
+ Assert.assertEquals(1, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("", incoming.getSourceServiceName());
Assert.assertEquals("productpage", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -117,11 +116,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+ AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
- Assert.assertEquals(1, result.size());
+ Assert.assertEquals(1, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("productpage", incoming.getSourceServiceName());
Assert.assertEquals("review", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -134,11 +133,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+ AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
- Assert.assertEquals(1, result.size());
+ Assert.assertEquals(1, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("productpage", incoming.getSourceServiceName());
Assert.assertEquals("detail", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint());