You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/04/15 13:08:49 UTC

[incubator-skywalking] branch envoy-access-log updated: Finish codes of side car.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch envoy-access-log
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/envoy-access-log by this push:
     new 8479d87  Finish codes of side car.
8479d87 is described below

commit 8479d879d3abcf35823ffd8a3fc373c920b98c3f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Apr 15 21:08:38 2019 +0800

    Finish codes of side car.
---
 .../envoy/als/K8sALSServiceMeshHTTPAnalysis.java   | 106 ++++++++++++++++++++-
 .../server/receiver/envoy/als/ServiceMetaInfo.java |  16 +++-
 .../mesh/ServiceMeshMetricDataDecorator.java       |  17 +++-
 .../receiver/mesh/TelemetryDataDispatcher.java     |  30 +++---
 4 files changed, 149 insertions(+), 20 deletions(-)

diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
index 3ff7a35..0aff88c 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig
 import org.slf4j.*;
 
 /**
- * Analysis log based on
+ * Analysis log based on ingress and mesh scenarios.
  *
  * @author wusheng
  */
@@ -52,14 +52,109 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
         switch (role) {
             case PROXY:
                 analysisProxy(identifier, entry);
+                break;
             case SIDECAR:
-
+                return analysisSideCar(identifier, entry);
         }
 
         return Collections.emptyList();
     }
 
-    
+    protected List<Source> analysisSideCar(StreamAccessLogsMessage.Identifier identifier,
+        HTTPAccessLogEntry entry) {
+        List<Source> sources = new ArrayList<>();
+        AccessLogCommon properties = entry.getCommonProperties();
+        if (properties != null) {
+            String cluster = properties.getUpstreamCluster();
+            if (cluster != null) {
+                long startTime = formatAsLong(properties.getStartTime());
+                long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+                HTTPRequestProperties request = entry.getRequest();
+                String endpoint = "/";
+                Protocol protocol = Protocol.HTTP;
+                if (request != null) {
+                    endpoint = request.getPath();
+                    String schema = request.getScheme();
+                    if (schema.equals("http") || schema.equals("https")) {
+                        protocol = Protocol.HTTP;
+                    } else {
+                        protocol = Protocol.gRPC;
+                    }
+                }
+                HTTPResponseProperties response = entry.getResponse();
+                int responseCode = 200;
+                if (response != null) {
+                    responseCode = response.getResponseCode().getValue();
+                }
+                boolean status = responseCode >= 200 && responseCode < 400;
+
+                Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
+                ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress(),
+                    downstreamRemoteAddress.getSocketAddress().getPortValue());
+                Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+                ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress(),
+                    downstreamLocalAddress.getSocketAddress().getPortValue());
+                if (cluster.startsWith("inbound|")) {
+                    // Server side
+                    if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+                        // Ingress -> sidecar(server side)
+                        // Mesh telemetry without source, the relation would be generated.
+                        ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+                            .setEndTime(startTime + duration)
+                            .setDestServiceName(localService.getServiceName())
+                            .setDestServiceInstance(localService.getServiceInstanceName())
+                            .setEndpoint(endpoint).setLatency((int)duration)
+                            .setResponseCode(Math.toIntExact(responseCode))
+                            .setStatus(status).setProtocol(protocol)
+                            .setDetectPoint(DetectPoint.server)
+                            .build();
+
+                        logger.debug("Transformed ingress->sidecar inbound mesh metric {}", metric);
+                        forward(metric);
+                    } else {
+                        // sidecar -> sidecar(server side)
+                        ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+                            .setEndTime(startTime + duration)
+                            .setSourceServiceName(downstreamService.getServiceName())
+                            .setSourceServiceInstance(downstreamService.getServiceInstanceName())
+                            .setDestServiceName(localService.getServiceName())
+                            .setDestServiceInstance(localService.getServiceInstanceName())
+                            .setEndpoint(endpoint).setLatency((int)duration)
+                            .setResponseCode(Math.toIntExact(responseCode))
+                            .setStatus(status).setProtocol(protocol)
+                            .setDetectPoint(DetectPoint.server)
+                            .build();
+
+                        logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+                        forward(metric);
+                    }
+                } else if (cluster.startsWith("outbound|")) {
+                    // sidecar(client side) -> sidecar
+                    Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+                    ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress(),
+                        upstreamRemoteAddress.getSocketAddress().getPortValue());
+
+                    ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+                        .setEndTime(startTime + duration)
+                        .setSourceServiceName(downstreamService.getServiceName())
+                        .setSourceServiceInstance(downstreamService.getServiceInstanceName())
+                        .setDestServiceName(destService.getServiceName())
+                        .setDestServiceInstance(destService.getServiceInstanceName())
+                        .setEndpoint(endpoint).setLatency((int)duration)
+                        .setResponseCode(Math.toIntExact(responseCode))
+                        .setStatus(status).setProtocol(protocol)
+                        .setDetectPoint(DetectPoint.server)
+                        .build();
+
+                    logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+                    forward(metric);
+
+                }
+            }
+        }
+        return sources;
+    }
 
     protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier,
         HTTPAccessLogEntry entry) {
@@ -153,6 +248,11 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
         return prev;
     }
 
+    /**
+     * @param ip
+     * @param port
+     * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
+     */
     protected ServiceMetaInfo find(String ip, int port) {
         //TODO: go through API server to get target service info
         // If can't get service or service instance name, set `UNKNOWN` string.
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
index 9294a75..2b39e20 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
-import java.util.List;
+import java.util.*;
 import lombok.*;
 
 /**
@@ -46,5 +46,19 @@ public class ServiceMetaInfo {
         private String value;
     }
 
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        ServiceMetaInfo info = (ServiceMetaInfo)o;
+        return Objects.equals(serviceName, info.serviceName) &&
+            Objects.equals(serviceInstanceName, info.serviceInstanceName);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hash(serviceName, serviceInstanceName);
+    }
+
     public static final ServiceMetaInfo UNKNOWN = new ServiceMetaInfo("UNKNOWN", "UNKNOWN");
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
index 6d3be3b..f3ea308 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.aop.server.receiver.mesh;
 import com.google.gson.JsonObject;
 import org.apache.skywalking.apm.network.common.DetectPoint;
 import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
 import org.apache.skywalking.oap.server.receiver.sharing.server.CoreRegisterLinker;
@@ -46,12 +47,18 @@ public class ServiceMeshMetricDataDecorator {
         boolean isRegistered = true;
         sourceServiceId = origin.getSourceServiceId();
         if (sourceServiceId == Const.NONE) {
-            sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(origin.getSourceServiceName(), null);
-            if (sourceServiceId != Const.NONE) {
-                getNewDataBuilder().setSourceServiceId(sourceServiceId);
-            } else {
-                isRegistered = false;
+            String sourceServiceName = origin.getSourceServiceName();
+            // sourceServiceName is optional now,
+            // which means only generate dest service traffic.
+            if (!StringUtil.isEmpty(sourceServiceName)) {
+                sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(sourceServiceName, null);
+                if (sourceServiceId != Const.NONE) {
+                    getNewDataBuilder().setSourceServiceId(sourceServiceId);
+                } else {
+                    isRegistered = false;
+                }
             }
+            // No service name, service instance will be ignored too.
         }
         sourceServiceInstanceId = origin.getSourceServiceInstanceId();
         if (sourceServiceId != Const.NONE && sourceServiceInstanceId == Const.NONE) {
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 4731d48..da142b0 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -98,8 +98,13 @@ public class TelemetryDataDispatcher {
             toServiceInstance(decorator, minuteTimeBucket);
             toEndpoint(decorator, minuteTimeBucket);
         }
-        toServiceRelation(decorator, minuteTimeBucket);
-        toServiceInstanceRelation(decorator, minuteTimeBucket);
+
+        int sourceServiceId = metric.getSourceServiceId();
+        // Don't generate relation, if no source.
+        if (sourceServiceId != Const.NONE) {
+            toServiceRelation(decorator, minuteTimeBucket);
+            toServiceInstanceRelation(decorator, minuteTimeBucket);
+        }
     }
 
     private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
@@ -108,20 +113,23 @@ public class TelemetryDataDispatcher {
         int heartbeatCycle = 10000;
         // source
         int instanceId = metric.getSourceServiceInstanceId();
-        ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
-        if (Objects.nonNull(serviceInstanceInventory)) {
-            if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
-                // trigger heartbeat every 10s.
-                SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
-                SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+        // Don't generate source heartbeat, if no source.
+        if (instanceId != Const.NONE) {
+            ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+            if (Objects.nonNull(serviceInstanceInventory)) {
+                if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
+                    // trigger heartbeat every 10s.
+                    SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
+                    SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+                }
+            } else {
+                logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
             }
-        } else {
-            logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
         }
 
         // dest
         instanceId = metric.getDestServiceInstanceId();
-        serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+        ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
             if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
                 // trigger heartbeat every 10s.