You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/04/15 13:08:49 UTC
[incubator-skywalking] branch envoy-access-log updated: Finish
codes of side car.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch envoy-access-log
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/envoy-access-log by this push:
new 8479d87 Finish codes of side car.
8479d87 is described below
commit 8479d879d3abcf35823ffd8a3fc373c920b98c3f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Apr 15 21:08:38 2019 +0800
Finish codes of side car.
---
.../envoy/als/K8sALSServiceMeshHTTPAnalysis.java | 106 ++++++++++++++++++++-
.../server/receiver/envoy/als/ServiceMetaInfo.java | 16 +++-
.../mesh/ServiceMeshMetricDataDecorator.java | 17 +++-
.../receiver/mesh/TelemetryDataDispatcher.java | 30 +++---
4 files changed, 149 insertions(+), 20 deletions(-)
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
index 3ff7a35..0aff88c 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig
import org.slf4j.*;
/**
- * Analysis log based on
+ * Analysis log based on ingress and mesh scenarios.
*
* @author wusheng
*/
@@ -52,14 +52,109 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
switch (role) {
case PROXY:
analysisProxy(identifier, entry);
+ break;
case SIDECAR:
-
+ return analysisSideCar(identifier, entry);
}
return Collections.emptyList();
}
-
+ protected List<Source> analysisSideCar(StreamAccessLogsMessage.Identifier identifier,
+ HTTPAccessLogEntry entry) {
+ List<Source> sources = new ArrayList<>();
+ AccessLogCommon properties = entry.getCommonProperties();
+ if (properties != null) {
+ String cluster = properties.getUpstreamCluster();
+ if (cluster != null) {
+ long startTime = formatAsLong(properties.getStartTime());
+ long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+ HTTPRequestProperties request = entry.getRequest();
+ String endpoint = "/";
+ Protocol protocol = Protocol.HTTP;
+ if (request != null) {
+ endpoint = request.getPath();
+ String schema = request.getScheme();
+ if (schema.equals("http") || schema.equals("https")) {
+ protocol = Protocol.HTTP;
+ } else {
+ protocol = Protocol.gRPC;
+ }
+ }
+ HTTPResponseProperties response = entry.getResponse();
+ int responseCode = 200;
+ if (response != null) {
+ responseCode = response.getResponseCode().getValue();
+ }
+ boolean status = responseCode >= 200 && responseCode < 400;
+
+ Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
+ ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress(),
+ downstreamRemoteAddress.getSocketAddress().getPortValue());
+ Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+ ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress(),
+ downstreamLocalAddress.getSocketAddress().getPortValue());
+ if (cluster.startsWith("inbound|")) {
+ // Server side
+ if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+ // Ingress -> sidecar(server side)
+ // Mesh telemetry without source, the relation would be generated.
+ ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+ .setEndTime(startTime + duration)
+ .setDestServiceName(localService.getServiceName())
+ .setDestServiceInstance(localService.getServiceInstanceName())
+ .setEndpoint(endpoint).setLatency((int)duration)
+ .setResponseCode(Math.toIntExact(responseCode))
+ .setStatus(status).setProtocol(protocol)
+ .setDetectPoint(DetectPoint.server)
+ .build();
+
+ logger.debug("Transformed ingress->sidecar inbound mesh metric {}", metric);
+ forward(metric);
+ } else {
+ // sidecar -> sidecar(server side)
+ ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+ .setEndTime(startTime + duration)
+ .setSourceServiceName(downstreamService.getServiceName())
+ .setSourceServiceInstance(downstreamService.getServiceInstanceName())
+ .setDestServiceName(localService.getServiceName())
+ .setDestServiceInstance(localService.getServiceInstanceName())
+ .setEndpoint(endpoint).setLatency((int)duration)
+ .setResponseCode(Math.toIntExact(responseCode))
+ .setStatus(status).setProtocol(protocol)
+ .setDetectPoint(DetectPoint.server)
+ .build();
+
+ logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+ forward(metric);
+ }
+ } else if (cluster.startsWith("outbound|")) {
+ // sidecar(client side) -> sidecar
+ Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+ ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress(),
+ upstreamRemoteAddress.getSocketAddress().getPortValue());
+
+ ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+ .setEndTime(startTime + duration)
+ .setSourceServiceName(downstreamService.getServiceName())
+ .setSourceServiceInstance(downstreamService.getServiceInstanceName())
+ .setDestServiceName(destService.getServiceName())
+ .setDestServiceInstance(destService.getServiceInstanceName())
+ .setEndpoint(endpoint).setLatency((int)duration)
+ .setResponseCode(Math.toIntExact(responseCode))
+ .setStatus(status).setProtocol(protocol)
+ .setDetectPoint(DetectPoint.server)
+ .build();
+
+ logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+ forward(metric);
+
+ }
+ }
+ }
+ return sources;
+ }
protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier,
HTTPAccessLogEntry entry) {
@@ -153,6 +248,11 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
return prev;
}
+ /**
+ * @param ip
+ * @param port
+ * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
+ */
protected ServiceMetaInfo find(String ip, int port) {
//TODO: go through API server to get target service info
// If can't get service or service instance name, set `UNKNOWN` string.
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
index 9294a75..2b39e20 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.receiver.envoy.als;
-import java.util.List;
+import java.util.*;
import lombok.*;
/**
@@ -46,5 +46,19 @@ public class ServiceMetaInfo {
private String value;
}
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ ServiceMetaInfo info = (ServiceMetaInfo)o;
+ return Objects.equals(serviceName, info.serviceName) &&
+ Objects.equals(serviceInstanceName, info.serviceInstanceName);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(serviceName, serviceInstanceName);
+ }
+
public static final ServiceMetaInfo UNKNOWN = new ServiceMetaInfo("UNKNOWN", "UNKNOWN");
}
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
index 6d3be3b..f3ea308 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.aop.server.receiver.mesh;
import com.google.gson.JsonObject;
import org.apache.skywalking.apm.network.common.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.receiver.sharing.server.CoreRegisterLinker;
@@ -46,12 +47,18 @@ public class ServiceMeshMetricDataDecorator {
boolean isRegistered = true;
sourceServiceId = origin.getSourceServiceId();
if (sourceServiceId == Const.NONE) {
- sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(origin.getSourceServiceName(), null);
- if (sourceServiceId != Const.NONE) {
- getNewDataBuilder().setSourceServiceId(sourceServiceId);
- } else {
- isRegistered = false;
+ String sourceServiceName = origin.getSourceServiceName();
+ // sourceServiceName is optional now,
+ // which means only generate dest service traffic.
+ if (!StringUtil.isEmpty(sourceServiceName)) {
+ sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(sourceServiceName, null);
+ if (sourceServiceId != Const.NONE) {
+ getNewDataBuilder().setSourceServiceId(sourceServiceId);
+ } else {
+ isRegistered = false;
+ }
}
+ // No service name, service instance will be ignored too.
}
sourceServiceInstanceId = origin.getSourceServiceInstanceId();
if (sourceServiceId != Const.NONE && sourceServiceInstanceId == Const.NONE) {
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 4731d48..da142b0 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -98,8 +98,13 @@ public class TelemetryDataDispatcher {
toServiceInstance(decorator, minuteTimeBucket);
toEndpoint(decorator, minuteTimeBucket);
}
- toServiceRelation(decorator, minuteTimeBucket);
- toServiceInstanceRelation(decorator, minuteTimeBucket);
+
+ int sourceServiceId = metric.getSourceServiceId();
+ // Don't generate relation, if no source.
+ if (sourceServiceId != Const.NONE) {
+ toServiceRelation(decorator, minuteTimeBucket);
+ toServiceInstanceRelation(decorator, minuteTimeBucket);
+ }
}
private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
@@ -108,20 +113,23 @@ public class TelemetryDataDispatcher {
int heartbeatCycle = 10000;
// source
int instanceId = metric.getSourceServiceInstanceId();
- ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
- if (Objects.nonNull(serviceInstanceInventory)) {
- if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
- // trigger heartbeat every 10s.
- SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
- SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ // Don't generate source heartbeat, if no source.
+ if (instanceId != Const.NONE) {
+ ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+ if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
+ // trigger heartbeat every 10s.
+ SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
+ SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ }
+ } else {
+ logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
}
- } else {
- logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
}
// dest
instanceId = metric.getDestServiceInstanceId();
- serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+ ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
// trigger heartbeat every 10s.