You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2019/04/17 05:54:22 UTC

[incubator-skywalking] branch envoy-access-log updated: Polish codes and test case

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch envoy-access-log
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/envoy-access-log by this push:
     new 33438c3  Polish codes and test case
     new 8bf82cf  Merge remote-tracking branch 'origin/envoy-access-log' into envoy-access-log
33438c3 is described below

commit 33438c3d5a311c9c6dc07d6343fd9bbaa99e5881
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 17 13:53:49 2019 +0800

    Polish codes and test case
---
 .../receiver/envoy/als/DependencyResource.java     | 19 +++++++
 .../oap/server/receiver/envoy/als/Fetcher.java     | 19 +++++++
 .../envoy/als/K8sALSServiceMeshHTTPAnalysis.java   | 65 +++++++++++-----------
 .../server/receiver/envoy/als/ServiceMetaInfo.java |  2 +
 .../receiver/envoy/als/DependencyResourceTest.java | 19 +++++++
 .../receiver/envoy/als/K8sHTTPAnalysisTest.java    | 40 ++++++-------
 6 files changed, 110 insertions(+), 54 deletions(-)

diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java
index 30c6243..e653990 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
 import io.kubernetes.client.models.V1ObjectMeta;
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java
index 3bed547..7efcc8a 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
 import io.kubernetes.client.ApiException;
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
index 4e9a848..bd539d9 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
@@ -30,7 +30,6 @@ import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
 import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
 import io.kubernetes.client.ApiClient;
-import io.kubernetes.client.ApiException;
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.apis.CoreV1Api;
 import io.kubernetes.client.apis.ExtensionsV1beta1Api;
@@ -38,6 +37,8 @@ import io.kubernetes.client.models.V1ObjectMeta;
 import io.kubernetes.client.models.V1Pod;
 import io.kubernetes.client.models.V1PodList;
 import io.kubernetes.client.util.Config;
+import lombok.AccessLevel;
+import lombok.Getter;
 import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
 import org.apache.skywalking.apm.network.common.DetectPoint;
 import org.apache.skywalking.apm.network.servicemesh.Protocol;
@@ -47,7 +48,6 @@ import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.Executors;
@@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
     private static final Logger logger = LoggerFactory.getLogger(K8sALSServiceMeshHTTPAnalysis.class);
 
+    @Getter(AccessLevel.PROTECTED)
     private final AtomicReference<Map<String, ServiceMetaInfo>> ipServiceMap = new AtomicReference<>();
 
     private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
@@ -73,38 +74,32 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
     }
 
     @Override public void init(EnvoyMetricReceiverConfig config) {
-        executorService.scheduleAtFixedRate(() -> {
-            try {
-                loadPodInfo();
-            } catch (Throwable th) {
-                logger.error("run load pod error", th);
-            }
-        }, 0,15, TimeUnit.SECONDS);
+        executorService.scheduleAtFixedRate(this::loadPodInfo, 0,15, TimeUnit.SECONDS);
+    }
+
+    private boolean invalidPodList() {
+        Map<String, ServiceMetaInfo> map = ipServiceMap.get();
+        return map == null || map.isEmpty();
     }
 
     private void loadPodInfo() {
-        ApiClient client;
-        try {
-            client = Config.defaultClient();
-        } catch (IOException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-        client.getHttpClient().setReadTimeout(20, TimeUnit.SECONDS);
-        Configuration.setDefaultApiClient(client);
-        CoreV1Api api = new CoreV1Api();
-        V1PodList list;
         try {
-            list = api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null);
-        } catch (ApiException e) {
-            throw new RuntimeException(e);
-        }
-        Map<String, ServiceMetaInfo> ipMap = new HashMap<>(list.getItems().size());
-        long startTime = System.nanoTime();
-        for (V1Pod item : list.getItems()) {
-            ipMap.put(item.getStatus().getPodIP(), createServiceMetaInfo(item.getMetadata()));
+            ApiClient client = Config.defaultClient();
+            client.getHttpClient().setReadTimeout(20, TimeUnit.SECONDS);
+            Configuration.setDefaultApiClient(client);
+            CoreV1Api api = new CoreV1Api();
+            V1PodList list = api.listPodForAllNamespaces(null, null, null,
+                    null, null, null, null, null, null);
+            Map<String, ServiceMetaInfo> ipMap = new HashMap<>(list.getItems().size());
+            long startTime = System.nanoTime();
+            for (V1Pod item : list.getItems()) {
+                ipMap.put(item.getStatus().getPodIP(), createServiceMetaInfo(item.getMetadata()));
+            }
+            logger.info("Load {} pods in {}ms", ipMap.size(), (System.nanoTime() - startTime) / 1_000_000);
+            ipServiceMap.set(ipMap);
+        } catch (Throwable th) {
+            logger.error("run load pod error", th);
         }
-        logger.info("Load {} pods in {}ms", ipMap.size(), (System.nanoTime() - startTime) / 1_000_000);
-        ipServiceMap.set(ipMap);
     }
 
     private ServiceMetaInfo createServiceMetaInfo(final V1ObjectMeta podMeta) {
@@ -137,6 +132,9 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
 
     @Override public List<Source> analysis(StreamAccessLogsMessage.Identifier identifier,
         HTTPAccessLogEntry entry, Role role) {
+        if (invalidPodList()) {
+            return Collections.emptyList();
+        }
         switch (role) {
             case PROXY:
                 analysisProxy(identifier, entry);
@@ -342,17 +340,16 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
      * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
      */
     protected ServiceMetaInfo find(String ip, int port) {
-        ServiceMetaInfo result = new ServiceMetaInfo();
-        result.setServiceName("UNKNOWN");
-        result.setServiceInstanceName("UNKNOWN");
         Map<String, ServiceMetaInfo> map = ipServiceMap.get();
         if (map == null) {
-            return result;
+            logger.debug("Unknown ip {}, ip -> service is null", ip);
+            return ServiceMetaInfo.UNKNOWN;
         }
         if (map.containsKey(ip)) {
             return map.get(ip);
         }
-        return result;
+        logger.debug("Unknown ip {}, ip -> service is {}", map);
+        return ServiceMetaInfo.UNKNOWN;
     }
 
     protected void forward(ServiceMeshMetric metric) {
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
index 83482ef..822f873 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
@@ -26,6 +26,7 @@ import lombok.*;
  */
 @Getter
 @Setter
+@ToString
 public class ServiceMetaInfo {
     private String serviceName;
     private String serviceInstanceName;
@@ -42,6 +43,7 @@ public class ServiceMetaInfo {
     @Setter
     @Getter
     @RequiredArgsConstructor
+    @ToString
     public static class KeyValue {
         private final String key;
         private final String value;
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java
index e1cdf47..b37ebed 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
 import io.kubernetes.client.ApiException;
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
index 9df3134..f54a22b 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
@@ -18,19 +18,29 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.util.JsonFormat;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
 import java.io.*;
 import java.util.*;
 import org.apache.skywalking.apm.network.common.DetectPoint;
 import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain;
 import org.junit.*;
 
 public class K8sHTTPAnalysisTest {
+
+    private MockK8sAnalysis analysis;
+
+    @Before
+    public void setUp() {
+        analysis = new MockK8sAnalysis();
+        analysis.init(null);
+    }
+
     @Test
     public void testIngressRoleIdentify() throws IOException {
-        MockK8sAnalysis analysis = new MockK8sAnalysis();
         try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress.msg"))) {
             StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
@@ -42,7 +52,6 @@ public class K8sHTTPAnalysisTest {
 
     @Test
     public void testSidecarRoleIdentify() throws IOException {
-        MockK8sAnalysis analysis = new MockK8sAnalysis();
         try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-server-sidecar.msg"))) {
             StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
@@ -54,7 +63,6 @@ public class K8sHTTPAnalysisTest {
 
     @Test
     public void testIngressMetric() throws IOException {
-        MockK8sAnalysis analysis = new MockK8sAnalysis();
         try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress.msg"))) {
             StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
@@ -77,7 +85,6 @@ public class K8sHTTPAnalysisTest {
 
     @Test
     public void testIngress2SidecarMetric() throws IOException {
-        MockK8sAnalysis analysis = new MockK8sAnalysis();
         try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress2sidecar.msg"))) {
             StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
@@ -95,7 +102,6 @@ public class K8sHTTPAnalysisTest {
 
     @Test
     public void testSidecar2SidecarServerMetric() throws IOException {
-        MockK8sAnalysis analysis = new MockK8sAnalysis();
         try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-server-sidecar.msg"))) {
             StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
@@ -113,7 +119,6 @@ public class K8sHTTPAnalysisTest {
 
     @Test
     public void testSidecar2SidecarClientMetric() throws IOException {
-        MockK8sAnalysis analysis = new MockK8sAnalysis();
         try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-client-sidecar.msg"))) {
             StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
@@ -133,23 +138,18 @@ public class K8sHTTPAnalysisTest {
         private List<ServiceMeshMetric> metrics = new ArrayList<>();
 
         @Override
-        protected void forward(ServiceMeshMetric metric) {
-            metrics.add(metric);
+        public void init(EnvoyMetricReceiverConfig config) {
+            getIpServiceMap().set(ImmutableMap.of(
+                    "10.44.2.56", new ServiceMetaInfo("ingress", "ingress-Inst"),
+                    "10.44.2.54", new ServiceMetaInfo("productpage", "productpage-Inst"),
+                    "10.44.6.66", new ServiceMetaInfo("detail", "detail-Inst"),
+                    "10.44.2.55", new ServiceMetaInfo("review", "detail-Inst")
+            ));
         }
 
         @Override
-        protected ServiceMetaInfo find(String ip, int port) {
-            switch (ip) {
-                case "10.44.2.56":
-                    return new ServiceMetaInfo("ingress", "ingress-Inst");
-                case "10.44.2.54":
-                    return new ServiceMetaInfo("productpage", "productpage-Inst");
-                case "10.44.6.66":
-                    return new ServiceMetaInfo("detail", "detail-Inst");
-                case "10.44.2.55":
-                    return new ServiceMetaInfo("review", "detail-Inst");
-            }
-            return ServiceMetaInfo.UNKNOWN;
+        protected void forward(ServiceMeshMetric metric) {
+            metrics.add(metric);
         }
     }