You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/05/17 07:29:46 UTC

[skywalking] branch master updated: Support ALS and observe service mesh without Mixer (#2460)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ca7f7b  Support ALS and observe service mesh without Mixer (#2460)
3ca7f7b is described below

commit 3ca7f7b73cbe219d64a48e2844cb47377157c851
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri May 17 15:29:38 2019 +0800

    Support ALS and observe service mesh without Mixer (#2460)
    
    * Add ALS proto and receiver in envoy
---
 README.md                                          |   5 +-
 docker/oap/docker-entrypoint.sh                    |   9 +-
 docs/en/setup/README.md                            |   4 +-
 docs/en/setup/backend/backend-receivers.md         |   2 +-
 docs/en/setup/envoy/als_setting.md                 |  17 +
 .../{README.md => metrics_service_setting.md}      |   0
 oap-server/pom.xml                                 |   2 +-
 .../envoy-metrics-receiver-plugin/pom.xml          |  10 +
 .../envoy/AccessLogServiceGRPCHandler.java         | 123 +++++++
 .../receiver/envoy/EnvoyMetricReceiverConfig.java  |  35 ++
 .../envoy/EnvoyMetricReceiverProvider.java         |   9 +-
 .../server/receiver/envoy/als/ALSHTTPAnalysis.java |  42 +++
 .../receiver/envoy/als/DependencyResource.java     |  63 ++++
 .../oap/server/receiver/envoy/als/Fetcher.java     |  48 +++
 .../envoy/als/K8sALSServiceMeshHTTPAnalysis.java   | 372 +++++++++++++++++++++
 .../oap/server/receiver/envoy/als/Role.java        |  37 ++
 .../server/receiver/envoy/als/ServiceMetaInfo.java |  67 ++++
 ...g.oap.server.receiver.envoy.als.ALSHTTPAnalysis |  20 ++
 .../receiver/envoy/als/DependencyResourceTest.java |  97 ++++++
 .../receiver/envoy/als/K8sHTTPAnalysisTest.java    | 164 +++++++++
 .../src/test/resources/envoy-ingress.msg           |  89 +++++
 .../src/test/resources/envoy-ingress2sidecar.msg   |  99 ++++++
 .../test/resources/envoy-mesh-client-sidecar.msg   |  92 +++++
 .../test/resources/envoy-mesh-server-sidecar.msg   |  96 ++++++
 .../src/main/proto/envoy/api/v2/core/address.proto | 121 +++++++
 .../src/main/proto/envoy/api/v2/core/base.proto    | 176 ++++++++++
 .../proto/envoy/data/accesslog/v2/accesslog.proto  | 335 +++++++++++++++++++
 .../proto/envoy/service/accesslog/v2/als.proto     |  73 ++++
 .../mesh/ServiceMeshMetricDataDecorator.java       |  17 +-
 .../receiver/mesh/TelemetryDataDispatcher.java     |  30 +-
 .../src/main/assembly/application.yml              |   1 +
 .../src/main/resources/application.yml             |   1 +
 32 files changed, 2232 insertions(+), 24 deletions(-)

diff --git a/README.md b/README.md
index b28f8ac..4c6a8bd 100644
--- a/README.md
+++ b/README.md
@@ -37,9 +37,10 @@ including
 1. Java, [.NET Core](https://github.com/SkyAPM/SkyAPM-dotnet), [NodeJS](https://github.com/SkyAPM/SkyAPM-nodejs) and [PHP](https://github.com/SkyAPM/SkyAPM-php-sdk) auto-instrument agents in SkyWalking format
 1. Manual-instrument [Go agent](https://github.com/tetratelabs/go2sky) in SkyWalking format.
 1. Istio telemetry format
-1. Zipkin v1/v2 format
+1. Envoy gRPC Access Log Service (ALS) format in Istio controlled service mesh
+1. Envoy Metrics Service format.
+1. Zipkin v1/v2 format.
 1. Jaeger gRPC format.
-1. Envoy metrics format (the metrics entries itself is prometheus client [metrics family](https://github.com/prometheus/client_model/blob/fd36f4220a901265f90734c3183c5f0c91daa0b8/metrics.proto#L77))
 
 
 # Document
diff --git a/docker/oap/docker-entrypoint.sh b/docker/oap/docker-entrypoint.sh
index 764cf01..b44d612 100755
--- a/docker/oap/docker-entrypoint.sh
+++ b/docker/oap/docker-entrypoint.sh
@@ -186,8 +186,6 @@ service-mesh:
     bufferFileCleanWhenRestart: \${SW_SERVICE_MESH_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
 istio-telemetry:
   default:
-envoy-metric:
-  default:
 query:
   graphql:
     path: \${SW_QUERY_GRAPHQL_PATH:/graphql}
@@ -197,7 +195,14 @@ telemetry:
   prometheus:
     host: \${SW_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
     port: \${SW_TELEMETRY_PROMETHEUS_PORT:1234}
+envoy-metric:
+  default:
 EOT
+    if [[ "$SW_ENVOY_ALS_ENABLED" = "true" ]]; then
+        cat <<EOT >> ${var_application_file}
+    alsHTTPAnalysis: \${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh}
+EOT
+    fi
 
     if [[ "$SW_RECEIVER_ZIPKIN_ENABLED" = "true" ]]; then
         cat <<EOT >> ${var_application_file}
diff --git a/docs/en/setup/README.md b/docs/en/setup/README.md
index 43af35c..801fd2e 100644
--- a/docs/en/setup/README.md
+++ b/docs/en/setup/README.md
@@ -22,10 +22,12 @@ You could go to their project repositories to find out the releases and how to u
 ## Service Mesh
   - Istio
     - [SkyWalking on Istio](istio/README.md). Introduce how to use Istio Mixer bypass Adapter to work with SkyWalking.
+  - Envoy
+    - Use [ALS(access log service)](https://www.envoyproxy.io/docs/envoy/latest/api-v2/service/accesslog/v2/als.proto) to observe service mesh, without Mixer. Follow [document](envoy/als_setting.md) to open it.
 
 ## Proxy
   - [Envoy Proxy](https://www.envoyproxy.io/)
-    - [Sending metrics to Skywalking from Envoy](envoy/README.md). How to send metrics from Envoy to SkyWalking using [Metrics service](https://www.envoyproxy.io/docs/envoy/latest/api-v2/config/metrics/v2/metrics_service.proto.html).
+    - [Sending metrics to Skywalking from Envoy](envoy/metrics_service_setting.md). How to send metrics from Envoy to SkyWalking using [Metrics service](https://www.envoyproxy.io/docs/envoy/latest/api-v2/config/metrics/v2/metrics_service.proto.html).
 
 ## Setup backend
 Follow [backend and UI setup document](backend/backend-ui-setup.md) to understand and config the backend for different
diff --git a/docs/en/setup/backend/backend-receivers.md b/docs/en/setup/backend/backend-receivers.md
index a5cd09b..2dbc589 100644
--- a/docs/en/setup/backend/backend-receivers.md
+++ b/docs/en/setup/backend/backend-receivers.md
@@ -10,7 +10,7 @@ We have following receivers, and `default` implementors are provided in our Apac
 1. **service-mesh**. gRPC services accept data from inbound mesh probes.
 1. **receiver-jvm**. gRPC services accept JVM metrics data.
 1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services.
-1. **envoy-metric**. Envoy `metrics_service` supported by this receiver. OAL script support all GAUGE type metrics. 
+1. **envoy-metric**. Envoy `metrics_service` and `ALS(access log service)` supported by this receiver. OAL script support all GAUGE type metrics. 
 1. **receiver_zipkin**. See [details](#zipkin-receiver).
 1. **receiver_jaeger**. See [details](#jaeger-receiver).
 
diff --git a/docs/en/setup/envoy/als_setting.md b/docs/en/setup/envoy/als_setting.md
new file mode 100644
index 0000000..b57b608
--- /dev/null
+++ b/docs/en/setup/envoy/als_setting.md
@@ -0,0 +1,17 @@
+# Observe service mesh through ALS
+Envoy [ALS(access log service)](https://www.envoyproxy.io/docs/envoy/latest/api-v2/service/accesslog/v2/als.proto) provides
+fully logs about RPC routed, including HTTP and TCP.
+
+You need three steps to open ALS.
+1. Right now, Istio pilot hasn't supported to open ALS, so you have to change pilot codes.
+1. Open SkyWalking [envoy receiver](../backend/backend-receivers.md).
+1. Active ALS k8s-mesh analysis
+```yaml
+envoy-metric:
+  default:
+    alsHTTPAnalysis:
+      - k8s-mesh
+```
+
+Notice, only use this when using envoy under Istio controlled.
+Otherwise, you need to implement your own `ALSHTTPAnalysis` and register it to receiver.
\ No newline at end of file
diff --git a/docs/en/setup/envoy/README.md b/docs/en/setup/envoy/metrics_service_setting.md
similarity index 100%
rename from docs/en/setup/envoy/README.md
rename to docs/en/setup/envoy/metrics_service_setting.md
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index b701688..6c4948c 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -64,7 +64,7 @@
         <commons-io.version>2.6</commons-io.version>
         <elasticsearch.version>6.3.2</elasticsearch.version>
         <joda-time.version>2.9.9</joda-time.version>
-        <kubernetes.version>2.0.0</kubernetes.version>
+        <kubernetes.version>4.0.0</kubernetes.version>
         <hikaricp.version>3.1.0</hikaricp.version>
         <zipkin.version>2.9.1</zipkin.version>
         <caffeine.version>2.6.2</caffeine.version>
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
index 0b665db..9a78fb9 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
@@ -39,6 +39,16 @@
             <artifactId>skywalking-mesh-receiver-plugin</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.kubernetes</groupId>
+            <artifactId>client-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>3.5.1</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
new file mode 100644
index 0000000..f534a87
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy;
+
+import io.envoyproxy.envoy.service.accesslog.v2.*;
+import io.grpc.stub.StreamObserver;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.envoy.als.*;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.*;
+import org.slf4j.*;
+
+public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
+    private static final Logger logger = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
+    private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
+    private final SourceReceiver sourceReceiver;
+    private final CounterMetrics counter;
+    private final HistogramMetrics histogram;
+    private final CounterMetrics sourceDispatcherCounter;
+
+    public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) {
+        ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
+        envoyHTTPAnalysisList = new ArrayList<>();
+        for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
+            for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
+                if (httpAnalysisName.equals(httpAnalysis.name())) {
+                    httpAnalysis.init(config);
+                    envoyHTTPAnalysisList.add(httpAnalysis);
+                }
+            }
+        }
+
+        logger.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
+
+        sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+
+        MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
+        counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received",
+            MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver",
+            MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        sourceDispatcherCounter = metricCreator.createCounter("envoy_als_source_dispatch_count", "The count of envoy ALS metric received",
+            MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+    }
+
+    public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(
+        StreamObserver<StreamAccessLogsResponse> responseObserver) {
+        return new StreamObserver<StreamAccessLogsMessage>() {
+            private volatile boolean isFirst = true;
+            private Role role;
+            private StreamAccessLogsMessage.Identifier identifier;
+
+            @Override public void onNext(StreamAccessLogsMessage message) {
+                counter.inc();
+
+                HistogramMetrics.Timer timer = histogram.createTimer();
+                try {
+                    if (isFirst) {
+                        identifier = message.getIdentifier();
+                        isFirst = false;
+                        role = Role.NONE;
+                        for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
+                            role = analysis.identify(identifier, role);
+                        }
+                    }
+
+                    StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase();
+
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}",
+                            identifier.getNode().getId(), role, logCase, message);
+                    }
+
+                    switch (logCase) {
+                        case HTTP_LOGS:
+                            StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
+
+                            List<Source> sourceResult = new ArrayList<>();
+                            for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
+                                logs.getLogEntryList().forEach(log -> {
+                                    sourceResult.addAll(analysis.analysis(identifier, log, role));
+                                });
+                            }
+
+                            sourceDispatcherCounter.inc(sourceResult.size());
+                            sourceResult.forEach(sourceReceiver::receive);
+                    }
+                } finally {
+                    timer.finish();
+                }
+            }
+
+            @Override public void onError(Throwable throwable) {
+                logger.error("Error in receiving access log from envoy", throwable);
+                responseObserver.onCompleted();
+            }
+
+            @Override public void onCompleted() {
+                responseObserver.onNext(StreamAccessLogsResponse.newBuilder().build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
new file mode 100644
index 0000000..b7ff4d8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+/**
+ * @author wusheng,gaohongtao
+ */
+public class EnvoyMetricReceiverConfig extends ModuleConfig {
+    private String alsHTTPAnalysis;
+
+    public List<String> getAlsHTTPAnalysis() {
+        return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java
index 1631615..398eac9 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java
@@ -27,6 +27,12 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
  * @author wusheng
  */
 public class EnvoyMetricReceiverProvider extends ModuleProvider {
+    private final EnvoyMetricReceiverConfig config;
+
+    public EnvoyMetricReceiverProvider() {
+        config = new EnvoyMetricReceiverConfig();
+    }
+
     @Override public String name() {
         return "default";
     }
@@ -36,7 +42,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider {
     }
 
     @Override public ModuleConfig createConfigBeanIfAbsent() {
-        return null;
+        return config;
     }
 
     @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@@ -46,6 +52,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider {
     @Override public void start() throws ServiceNotProvidedException, ModuleStartException {
         GRPCHandlerRegister service = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
         service.addHandler(new MetricServiceGRPCHandler(getManager()));
+        service.addHandler(new AccessLogServiceGRPCHandler(getManager(), config));
     }
 
     @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
new file mode 100644
index 0000000..1de742d
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+
+/**
+ * Analysis source metrics from ALS
+ *
+ * @author wusheng
+ */
+public interface ALSHTTPAnalysis {
+    String name();
+
+    void init(EnvoyMetricReceiverConfig config);
+
+    List<Source> analysis(StreamAccessLogsMessage.Identifier identifier,
+        HTTPAccessLogEntry entry, Role role);
+
+    Role identify(StreamAccessLogsMessage.Identifier alsIdentifier,
+        Role prev);
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java
new file mode 100644
index 0000000..e653990
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1OwnerReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+@RequiredArgsConstructor
+class DependencyResource {
+    @Getter(AccessLevel.PACKAGE)
+    private final V1ObjectMeta metadata;
+
+    private boolean stop;
+
+    DependencyResource getOwnerResource(final String kind, final Fetcher transform) {
+        if (stop) {
+            return this;
+        }
+        if (metadata.getOwnerReferences() == null) {
+            stop = true;
+            return this;
+        }
+        V1OwnerReference ownerReference = null;
+        for (V1OwnerReference each : metadata.getOwnerReferences()) {
+            if (each.getKind().equals(kind)) {
+                ownerReference = each;
+                break;
+            }
+        }
+        if (ownerReference == null) {
+            stop = true;
+            return this;
+        }
+        Optional<V1ObjectMeta> metaOptional = transform.apply(ownerReference);
+        if (!metaOptional.isPresent()) {
+            stop = true;
+            return this;
+        }
+        return new DependencyResource(metaOptional.get());
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java
new file mode 100644
index 0000000..7efcc8a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1OwnerReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+interface Fetcher extends Function<V1OwnerReference, Optional<V1ObjectMeta>> {
+
+    Logger logger = LoggerFactory.getLogger(Fetcher.class);
+
+    V1ObjectMeta go(V1OwnerReference ownerReference) throws ApiException;
+
+    default Optional<V1ObjectMeta> apply(V1OwnerReference ownerReference) {
+        try {
+            return Optional.ofNullable(go(ownerReference));
+        } catch (final ApiException e) {
+            logger.error("code:{} header:{} body:{}", e.getCode(), e.getResponseHeaders(), e.getResponseBody());
+            return Optional.empty();
+        } catch (final Throwable th) {
+            logger.error("other errors", th);
+            return Optional.empty();
+        }
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
new file mode 100644
index 0000000..64aedc3
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import io.envoyproxy.envoy.api.v2.core.Address;
+import io.envoyproxy.envoy.api.v2.core.Node;
+import io.envoyproxy.envoy.api.v2.core.SocketAddress;
+import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.apis.ExtensionsV1beta1Api;
+import io.kubernetes.client.models.*;
+import io.kubernetes.client.util.Config;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
+import org.apache.skywalking.apm.network.common.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.Protocol;
+import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Analysis log based on ingress and mesh scenarios.
+ *
+ * @author wusheng
+ */
+public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
+    private static final Logger logger = LoggerFactory.getLogger(K8sALSServiceMeshHTTPAnalysis.class);
+
+    private static final String ADDRESS_TYPE_INTERNAL_IP = "InternalIP";
+
+    @Getter(AccessLevel.PROTECTED)
+    private final AtomicReference<Map<String, ServiceMetaInfo>> ipServiceMap = new AtomicReference<>();
+
+    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+            .setNameFormat("load-pod-%d").setDaemon(true).build());
+
+    @Override public String name() {
+        return "k8s-mesh";
+    }
+
+    @Override public void init(EnvoyMetricReceiverConfig config) {
+        executorService.scheduleAtFixedRate(this::loadPodInfo, 0,15, TimeUnit.SECONDS);
+    }
+
+    private boolean invalidPodList() {
+        Map<String, ServiceMetaInfo> map = ipServiceMap.get();
+        return map == null || map.isEmpty();
+    }
+
+    private void loadPodInfo() {
+        try {
+            ApiClient client = Config.defaultClient();
+            client.getHttpClient().setReadTimeout(20, TimeUnit.SECONDS);
+            Configuration.setDefaultApiClient(client);
+            CoreV1Api api = new CoreV1Api();
+            V1PodList list = api.listPodForAllNamespaces(null, null, null,
+                    null, null, null, null, null, null);
+            Map<String, ServiceMetaInfo> ipMap = new HashMap<>(list.getItems().size());
+            long startTime = System.nanoTime();
+            for (V1Pod item : list.getItems()) {
+                if (item.getStatus().getPodIP().equals(item.getStatus().getHostIP())) {
+                    logger.warn("Pod {}.{} is removed because hostIP and podIP are identical ", item.getMetadata().getName());
+                    continue;
+                }
+                ipMap.put(item.getStatus().getPodIP(), createServiceMetaInfo(item.getMetadata()));
+            }
+            logger.info("Load {} pods in {}ms", ipMap.size(), (System.nanoTime() - startTime) / 1_000_000);
+            ipServiceMap.set(ipMap);
+        } catch (Throwable th) {
+            logger.error("run load pod error", th);
+        }
+    }
+
+    private ServiceMetaInfo createServiceMetaInfo(final V1ObjectMeta podMeta) {
+        ExtensionsV1beta1Api extensionsApi = new ExtensionsV1beta1Api();
+        DependencyResource dr = new DependencyResource(podMeta);
+        DependencyResource meta = dr
+                .getOwnerResource("ReplicaSet", ownerReference ->
+                        extensionsApi.readNamespacedReplicaSet(ownerReference.getName(), podMeta.getNamespace(),
+                "", true, true).getMetadata());
+        ServiceMetaInfo result = new ServiceMetaInfo();
+        if (meta.getMetadata().getOwnerReferences() != null && meta.getMetadata().getOwnerReferences().size() > 0) {
+            V1OwnerReference owner = meta.getMetadata().getOwnerReferences().get(0);
+            result.setServiceName(String.format("%s.%s", owner.getName(), meta.getMetadata().getNamespace()));
+        } else {
+            result.setServiceName(String.format("%s.%s", meta.getMetadata().getName(), meta.getMetadata().getNamespace()));
+        }
+        result.setServiceInstanceName(String.format("%s.%s", podMeta.getName(), podMeta.getNamespace()));
+        result.setTags(transformLabelsToTags(podMeta.getLabels()));
+        return result;
+    }
+
+    private List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final Map<String, String> labels) {
+        if (labels == null || labels.size() < 1) {
+            return Collections.emptyList();
+        }
+        List<ServiceMetaInfo.KeyValue> result = new ArrayList<>(labels.size());
+        for (Map.Entry<String, String> each : labels.entrySet()) {
+            result.add(new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue()));
+        }
+        return result;
+    }
+
+    @Override public List<Source> analysis(StreamAccessLogsMessage.Identifier identifier,
+        HTTPAccessLogEntry entry, Role role) {
+        if (invalidPodList()) {
+            return Collections.emptyList();
+        }
+        switch (role) {
+            case PROXY:
+                analysisProxy(identifier, entry);
+                break;
+            case SIDECAR:
+                return analysisSideCar(identifier, entry);
+        }
+
+        return Collections.emptyList();
+    }
+
+    protected List<Source> analysisSideCar(StreamAccessLogsMessage.Identifier identifier,
+        HTTPAccessLogEntry entry) {
+        List<Source> sources = new ArrayList<>();
+        AccessLogCommon properties = entry.getCommonProperties();
+        if (properties != null) {
+            String cluster = properties.getUpstreamCluster();
+            if (cluster != null) {
+                long startTime = formatAsLong(properties.getStartTime());
+                long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+                HTTPRequestProperties request = entry.getRequest();
+                String endpoint = "/";
+                Protocol protocol = Protocol.HTTP;
+                if (request != null) {
+                    endpoint = request.getPath();
+                    String schema = request.getScheme();
+                    if (schema.equals("http") || schema.equals("https")) {
+                        protocol = Protocol.HTTP;
+                    } else {
+                        protocol = Protocol.gRPC;
+                    }
+                }
+                HTTPResponseProperties response = entry.getResponse();
+                int responseCode = 200;
+                if (response != null) {
+                    responseCode = response.getResponseCode().getValue();
+                }
+                boolean status = responseCode >= 200 && responseCode < 400;
+
+                Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
+                ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress(),
+                    downstreamRemoteAddress.getSocketAddress().getPortValue());
+                Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+                ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress(),
+                    downstreamLocalAddress.getSocketAddress().getPortValue());
+                if (cluster.startsWith("inbound|")) {
+                    // Server side
+                    if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+                        // Ingress -> sidecar(server side)
+                        // Mesh telemetry without source, the relation would be generated.
+                        ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+                            .setEndTime(startTime + duration)
+                            .setDestServiceName(localService.getServiceName())
+                            .setDestServiceInstance(localService.getServiceInstanceName())
+                            .setEndpoint(endpoint).setLatency((int)duration)
+                            .setResponseCode(Math.toIntExact(responseCode))
+                            .setStatus(status).setProtocol(protocol)
+                            .setDetectPoint(DetectPoint.server)
+                            .build();
+
+                        logger.debug("Transformed ingress->sidecar inbound mesh metric {}", metric);
+                        forward(metric);
+                    } else {
+                        // sidecar -> sidecar(server side)
+                        ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+                            .setEndTime(startTime + duration)
+                            .setSourceServiceName(downstreamService.getServiceName())
+                            .setSourceServiceInstance(downstreamService.getServiceInstanceName())
+                            .setDestServiceName(localService.getServiceName())
+                            .setDestServiceInstance(localService.getServiceInstanceName())
+                            .setEndpoint(endpoint).setLatency((int)duration)
+                            .setResponseCode(Math.toIntExact(responseCode))
+                            .setStatus(status).setProtocol(protocol)
+                            .setDetectPoint(DetectPoint.server)
+                            .build();
+
+                        logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+                        forward(metric);
+                    }
+                } else if (cluster.startsWith("outbound|")) {
+                    // sidecar(client side) -> sidecar
+                    Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+                    ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress(),
+                        upstreamRemoteAddress.getSocketAddress().getPortValue());
+
+                    ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+                        .setEndTime(startTime + duration)
+                        .setSourceServiceName(downstreamService.getServiceName())
+                        .setSourceServiceInstance(downstreamService.getServiceInstanceName())
+                        .setDestServiceName(destService.getServiceName())
+                        .setDestServiceInstance(destService.getServiceInstanceName())
+                        .setEndpoint(endpoint).setLatency((int)duration)
+                        .setResponseCode(Math.toIntExact(responseCode))
+                        .setStatus(status).setProtocol(protocol)
+                        .setDetectPoint(DetectPoint.client)
+                        .build();
+
+                    logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+                    forward(metric);
+
+                }
+            }
+        }
+        return sources;
+    }
+
+    protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier,
+        HTTPAccessLogEntry entry) {
+        AccessLogCommon properties = entry.getCommonProperties();
+        if (properties != null) {
+            Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+            Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
+            Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+            if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) {
+                SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
+                ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress(), downstreamRemoteAddressSocketAddress.getPortValue());
+
+                SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
+                ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress(), downstreamLocalAddressSocketAddress.getPortValue());
+
+                long startTime = formatAsLong(properties.getStartTime());
+                long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+                HTTPRequestProperties request = entry.getRequest();
+                String endpoint = "/";
+                Protocol protocol = Protocol.HTTP;
+                if (request != null) {
+                    endpoint = request.getPath();
+                    String schema = request.getScheme();
+                    if (schema.equals("http") || schema.equals("https")) {
+                        protocol = Protocol.HTTP;
+                    } else {
+                        protocol = Protocol.gRPC;
+                    }
+                }
+                HTTPResponseProperties response = entry.getResponse();
+                int responseCode = 200;
+                if (response != null) {
+                    responseCode = response.getResponseCode().getValue();
+                }
+                boolean status = responseCode >= 200 && responseCode < 400;
+
+                ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime)
+                    .setEndTime(startTime + duration)
+                    .setSourceServiceName(outside.getServiceName())
+                    .setSourceServiceInstance(outside.getServiceInstanceName())
+                    .setDestServiceName(ingress.getServiceName())
+                    .setDestServiceInstance(ingress.getServiceInstanceName())
+                    .setEndpoint(endpoint).setLatency((int)duration)
+                    .setResponseCode(Math.toIntExact(responseCode))
+                    .setStatus(status).setProtocol(protocol)
+                    .setDetectPoint(DetectPoint.server)
+                    .build();
+
+                logger.debug("Transformed ingress inbound mesh metric {}", metric);
+                forward(metric);
+
+                SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
+                ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress(), upstreamRemoteAddressSocketAddress.getPortValue());
+
+                long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte());
+                long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte());
+
+                ServiceMeshMetric outboundMetric = ServiceMeshMetric.newBuilder().setStartTime(outboundStartTime)
+                    .setEndTime(outboundEndTime)
+                    .setSourceServiceName(ingress.getServiceName())
+                    .setSourceServiceInstance(ingress.getServiceInstanceName())
+                    .setDestServiceName(targetService.getServiceName())
+                    .setDestServiceInstance(targetService.getServiceInstanceName())
+                    .setEndpoint(endpoint).setLatency((int)(outboundEndTime - outboundStartTime))
+                    .setResponseCode(Math.toIntExact(responseCode))
+                    .setStatus(status).setProtocol(protocol)
+                    .setDetectPoint(DetectPoint.client)
+                    .build();
+
+                logger.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
+                forward(outboundMetric);
+            }
+        }
+    }
+
+    @Override public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier,
+        Role prev) {
+        if (alsIdentifier != null) {
+            Node node = alsIdentifier.getNode();
+            if (node != null) {
+                String id = node.getId();
+                if (id.startsWith("router~")) {
+                    return Role.PROXY;
+                } else if (id.startsWith("sidecar~")) {
+                    return Role.SIDECAR;
+                }
+            }
+        }
+
+        return prev;
+    }
+
+    /**
+     * @param ip
+     * @param port
+     * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
+     */
+    protected ServiceMetaInfo find(String ip, int port) {
+        Map<String, ServiceMetaInfo> map = ipServiceMap.get();
+        if (map == null) {
+            logger.debug("Unknown ip {}, ip -> service is null", ip);
+            return ServiceMetaInfo.UNKNOWN;
+        }
+        if (map.containsKey(ip)) {
+            return map.get(ip);
+        }
+        logger.debug("Unknown ip {}, ip -> service is {}", map);
+        return ServiceMetaInfo.UNKNOWN;
+    }
+
+    protected void forward(ServiceMeshMetric metric) {
+        TelemetryDataDispatcher.preProcess(metric);
+    }
+
+    private long formatAsLong(Timestamp timestamp) {
+        return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
+    }
+
+    private long formatAsLong(Duration duration) {
+        return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Role.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Role.java
new file mode 100644
index 0000000..8842c39
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Role.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+/**
+ * The role of envoy in this RPC.
+ */
+public enum Role {
+    /**
+     * Can't identify
+     */
+    NONE,
+    /**
+     * Proxy, such as Ingress, or not mesh
+     */
+    PROXY,
+    /**
+     * Sidecar in mesh
+     */
+    SIDECAR
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
new file mode 100644
index 0000000..822f873
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import java.util.*;
+import lombok.*;
+
+/**
+ * @author wusheng
+ */
+@Getter
+@Setter
+@ToString
+public class ServiceMetaInfo {
+    private String serviceName;
+    private String serviceInstanceName;
+    private List<KeyValue> tags;
+
+    public ServiceMetaInfo() {
+    }
+
+    public ServiceMetaInfo(String serviceName, String serviceInstanceName) {
+        this.serviceName = serviceName;
+        this.serviceInstanceName = serviceInstanceName;
+    }
+
+    @Setter
+    @Getter
+    @RequiredArgsConstructor
+    @ToString
+    public static class KeyValue {
+        private final String key;
+        private final String value;
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        ServiceMetaInfo info = (ServiceMetaInfo)o;
+        return Objects.equals(serviceName, info.serviceName) &&
+            Objects.equals(serviceInstanceName, info.serviceInstanceName);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hash(serviceName, serviceInstanceName);
+    }
+
+    public static final ServiceMetaInfo UNKNOWN = new ServiceMetaInfo("UNKNOWN", "UNKNOWN");
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
new file mode 100644
index 0000000..c744de5
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+
+org.apache.skywalking.oap.server.receiver.envoy.als.K8sALSServiceMeshHTTPAnalysis
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java
new file mode 100644
index 0000000..b37ebed
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1OwnerReference;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(Parameterized.class)
+public class DependencyResourceTest {
+
+    @Parameterized.Parameter
+    public String resourceName;
+
+    @Parameterized.Parameter(1)
+    public ThrowableFunction function;
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"deploy1", (ThrowableFunction) result -> result},
+                {"pod1", (ThrowableFunction) result -> { throw new RuntimeException(); } },
+                {"pod1", (ThrowableFunction) result -> { throw new ApiException(); } },
+                {"pod1", (ThrowableFunction) result -> null},
+                {"rs1", (ThrowableFunction) result -> {
+                    result.setOwnerReferences(null);
+                    return result;
+                } },
+                {"rs1", (ThrowableFunction) result -> {
+                    V1OwnerReference reference1 = new V1OwnerReference();
+                    reference1.setKind("StatefulSet");
+                    reference1.setName("ss1");
+                    result.setOwnerReferences(Collections.singletonList(reference1));
+                    return result;
+                } },
+        });
+    }
+
+    @Test
+    public void test() {
+        V1ObjectMeta meta = new V1ObjectMeta();
+        meta.setName("pod1");
+        V1OwnerReference reference = new V1OwnerReference();
+        reference.setKind("ReplicaSet");
+        reference.setName("rs1");
+        meta.addOwnerReferencesItem(reference);
+        DependencyResource dr = new DependencyResource(meta);
+        DependencyResource drr =  dr.getOwnerResource("ReplicaSet", ownerReference -> {
+            assertThat(ownerReference.getName(), is("rs1"));
+            V1ObjectMeta result = new V1ObjectMeta();
+            result.setName("rs1");
+            V1OwnerReference reference1 = new V1OwnerReference();
+            reference1.setKind("Deployment");
+            reference1.setName("deploy1");
+            result.addOwnerReferencesItem(reference1);
+            return function.go(result);
+        }).getOwnerResource("Deployment", ownerReference -> {
+            assertThat(ownerReference.getName(), is("deploy1"));
+            V1ObjectMeta result = new V1ObjectMeta();
+            result.setName("deploy1");
+            return result;
+        });
+        assertThat(drr.getMetadata().getName(), is(resourceName));
+    }
+
+    interface ThrowableFunction {
+        V1ObjectMeta go(final V1ObjectMeta result) throws ApiException;
+    }
+
+}
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
new file mode 100644
index 0000000..f54a22b
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.util.JsonFormat;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import java.io.*;
+import java.util.*;
+import org.apache.skywalking.apm.network.common.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain;
+import org.junit.*;
+
+public class K8sHTTPAnalysisTest {
+
+    private MockK8sAnalysis analysis;
+
+    @Before
+    public void setUp() {
+        analysis = new MockK8sAnalysis();
+        analysis.init(null);
+    }
+
+    @Test
+    public void testIngressRoleIdentify() throws IOException {
+        try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress.msg"))) {
+            StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
+            JsonFormat.parser().merge(isr, requestBuilder);
+            Role identify = analysis.identify(requestBuilder.getIdentifier(), Role.NONE);
+
+            Assert.assertEquals(Role.PROXY, identify);
+        }
+    }
+
+    @Test
+    public void testSidecarRoleIdentify() throws IOException {
+        try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-server-sidecar.msg"))) {
+            StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
+            JsonFormat.parser().merge(isr, requestBuilder);
+            Role identify = analysis.identify(requestBuilder.getIdentifier(), Role.NONE);
+
+            Assert.assertEquals(Role.SIDECAR, identify);
+        }
+    }
+
+    @Test
+    public void testIngressMetric() throws IOException {
+        try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress.msg"))) {
+            StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
+            JsonFormat.parser().merge(isr, requestBuilder);
+
+            analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
+
+            Assert.assertEquals(2, analysis.metrics.size());
+
+            ServiceMeshMetric incoming = analysis.metrics.get(0);
+            Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName());
+            Assert.assertEquals("ingress", incoming.getDestServiceName());
+            Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
+
+            ServiceMeshMetric outgoing = analysis.metrics.get(1);
+            Assert.assertEquals("ingress", outgoing.getSourceServiceName());
+            Assert.assertEquals("productpage", outgoing.getDestServiceName());
+            Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint());
+        }
+    }
+
+    @Test
+    public void testIngress2SidecarMetric() throws IOException {
+        try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress2sidecar.msg"))) {
+            StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
+            JsonFormat.parser().merge(isr, requestBuilder);
+
+            analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+
+            Assert.assertEquals(1, analysis.metrics.size());
+
+            ServiceMeshMetric incoming = analysis.metrics.get(0);
+            Assert.assertEquals("", incoming.getSourceServiceName());
+            Assert.assertEquals("productpage", incoming.getDestServiceName());
+            Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
+        }
+    }
+
+    @Test
+    public void testSidecar2SidecarServerMetric() throws IOException {
+        try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-server-sidecar.msg"))) {
+            StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
+            JsonFormat.parser().merge(isr, requestBuilder);
+
+            analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+
+            Assert.assertEquals(1, analysis.metrics.size());
+
+            ServiceMeshMetric incoming = analysis.metrics.get(0);
+            Assert.assertEquals("productpage", incoming.getSourceServiceName());
+            Assert.assertEquals("review", incoming.getDestServiceName());
+            Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
+        }
+    }
+
+    @Test
+    public void testSidecar2SidecarClientMetric() throws IOException {
+        try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-client-sidecar.msg"))) {
+            StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
+            JsonFormat.parser().merge(isr, requestBuilder);
+
+            analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+
+            Assert.assertEquals(1, analysis.metrics.size());
+
+            ServiceMeshMetric incoming = analysis.metrics.get(0);
+            Assert.assertEquals("productpage", incoming.getSourceServiceName());
+            Assert.assertEquals("detail", incoming.getDestServiceName());
+            Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint());
+        }
+    }
+
+    public static class MockK8sAnalysis extends K8sALSServiceMeshHTTPAnalysis {
+        private List<ServiceMeshMetric> metrics = new ArrayList<>();
+
+        @Override
+        public void init(EnvoyMetricReceiverConfig config) {
+            getIpServiceMap().set(ImmutableMap.of(
+                    "10.44.2.56", new ServiceMetaInfo("ingress", "ingress-Inst"),
+                    "10.44.2.54", new ServiceMetaInfo("productpage", "productpage-Inst"),
+                    "10.44.6.66", new ServiceMetaInfo("detail", "detail-Inst"),
+                    "10.44.2.55", new ServiceMetaInfo("review", "detail-Inst")
+            ));
+        }
+
+        @Override
+        protected void forward(ServiceMeshMetric metric) {
+            metrics.add(metric);
+        }
+    }
+
+    private static InputStream getResourceAsStream(final String resource) {
+        final InputStream in = getContextClassLoader().getResourceAsStream(resource);
+        return in == null ? MetricServiceGRPCHandlerTestMain.class.getResourceAsStream(resource) : in;
+    }
+
+    private static ClassLoader getContextClassLoader() {
+        return Thread.currentThread().getContextClassLoader();
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress.msg
new file mode 100644
index 0000000..0f4917e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress.msg
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+{
+    "identifier": {
+        "node": {
+            "id": "router~10.44.2.56~istio-ingressgateway-699c7dc774-hjxq5.istio-system~istio-system.svc.cluster.local",
+            "cluster": "istio-ingressgateway",
+            "metadata": {
+                "CONFIG_NAMESPACE": "istio-system",
+                "ISTIO_META_INSTANCE_IPS": "10.44.2.56,10.44.2.56,fe80::9ca5:e5ff:fede:6414",
+                "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70",
+                "ISTIO_PROXY_VERSION": "1.1.0",
+                "ISTIO_VERSION": "1.0-dev",
+                "POD_NAME": "istio-ingressgateway-699c7dc774-hjxq5",
+                "ROUTER_MODE": "sni-dnat",
+                "istio": "sidecar"
+            },
+            "locality": { },
+            "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL"
+        },
+        "logName": "als"
+    },
+    "httpLogs": {
+        "logEntry": [
+            {
+                "commonProperties": {
+                    "downstreamRemoteAddress": {
+                        "socketAddress": {
+                            "address": "10.138.0.14",
+                            "portValue": 51489
+                        }
+                    },
+                    "downstreamLocalAddress": {
+                        "socketAddress": {
+                            "address": "10.44.2.56",
+                            "portValue": 80
+                        }
+                    },
+                    "startTime": "2019-04-13T03:59:53.687224601Z",
+                    "timeToLastRxByte": "0.000031206s",
+                    "timeToFirstUpstreamTxByte": "0.000869250s",
+                    "timeToLastUpstreamTxByte": "0.000881276s",
+                    "timeToFirstUpstreamRxByte": "1.010010710s",
+                    "timeToLastUpstreamRxByte": "1.010423815s",
+                    "timeToFirstDownstreamTxByte": "1.010053396s",
+                    "timeToLastDownstreamTxByte": "1.010432910s",
+                    "upstreamRemoteAddress": {
+                        "socketAddress": {
+                            "address": "10.44.2.54",
+                            "portValue": 9080
+                        }
+                    },
+                    "upstreamCluster": "outbound|9080||productpage.default.svc.cluster.local"
+                },
+                "protocolVersion": "HTTP11",
+                "request": {
+                    "requestMethod": "GET",
+                    "scheme": "http",
+                    "authority": "35.227.162.132",
+                    "path": "/productpage",
+                    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15",
+                    "forwardedFor": "10.138.0.14",
+                    "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5",
+                    "requestHeadersBytes": "1038"
+                },
+                "response": {
+                    "responseCode": 200,
+                    "responseHeadersBytes": "147",
+                    "responseBodyBytes": "4415"
+                }
+            }
+        ]
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress2sidecar.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress2sidecar.msg
new file mode 100644
index 0000000..b641b14
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress2sidecar.msg
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+    // Mock identifier
+    "identifier": {
+            "node": {
+                "id": "sidecar~10.44.2.54~product-v1-d66dcfdc5-kh6v7.default~default.svc.cluster.local",
+                "cluster": "product.default",
+                "metadata": {
+                    "CONFIG_NAMESPACE": "default",
+                    "INTERCEPTION_MODE": "REDIRECT",
+                    "ISTIO_META_INSTANCE_IPS": "10.44.2.54,10.44.2.54,fe80::d8e8:b6ff:fed6:f857",
+                    "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70",
+                    "ISTIO_PROXY_VERSION": "1.1.0",
+                    "ISTIO_VERSION": "1.0-dev",
+                    "POD_NAME": "product-v1-d66dcfdc5-kh6v7",
+                    "app": "product",
+                    "istio": "sidecar",
+                    "kubernetes.io/limit-ranger": "LimitRanger plugin set: cpu request for container istio-proxy; cpu request for container reviews",
+                    "pod-template-hash": "822879871",
+                    "version": "v1"
+                },
+                "locality": { },
+                "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL"
+            },
+            "logName": "als"
+        },
+    // Real log sample
+    "httpLogs": {
+        "logEntry": [
+            {
+                "commonProperties": {
+                    "downstreamRemoteAddress": {
+                        "socketAddress": {
+                            "address": "10.138.0.14",
+                            "portValue": 0
+                        }
+                    },
+                    "downstreamLocalAddress": {
+                        "socketAddress": {
+                            "address": "10.44.2.54",
+                            "portValue": 9080
+                        }
+                    },
+                    "startTime": "2019-04-13T03:59:53.688609181Z",
+                    "timeToLastRxByte": "0.000081758s",
+                    "timeToFirstUpstreamTxByte": "0.000789220s",
+                    "timeToLastUpstreamTxByte": "0.000808326s",
+                    "timeToFirstUpstreamRxByte": "1.008120501s",
+                    "timeToLastUpstreamRxByte": "1.008369826s",
+                    "timeToFirstDownstreamTxByte": "1.008242458s",
+                    "timeToLastDownstreamTxByte": "1.008378251s",
+                    "upstreamRemoteAddress": {
+                        "socketAddress": {
+                            "address": "127.0.0.1",
+                            "portValue": 9080
+                        }
+                    },
+                    "upstreamCluster": "inbound|9080|http|productpage.default.svc.cluster.local",
+                    "metadata": {
+                        "filterMetadata": {
+                            "istio_authn": { }
+                        }
+                    }
+                },
+                "protocolVersion": "HTTP11",
+                "request": {
+                    "requestMethod": "GET",
+                    "scheme": "http",
+                    "authority": "35.227.162.132",
+                    "path": "/productpage",
+                    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15",
+                    "forwardedFor": "10.138.0.14",
+                    "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5",
+                    "requestHeadersBytes": "579"
+                },
+                "response": {
+                    "responseCode": 200,
+                    "responseHeadersBytes": "147",
+                    "responseBodyBytes": "4415"
+                }
+            }
+        ]
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-client-sidecar.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-client-sidecar.msg
new file mode 100644
index 0000000..1049818
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-client-sidecar.msg
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+    "identifier": {
+            "node": {
+                "id": "sidecar~10.44.2.55~productpage-v1-d66dcfdc5-kh6v7.default~default.svc.cluster.local",
+                "cluster": "productpage.default",
+                "metadata": {
+                    "CONFIG_NAMESPACE": "default",
+                    "INTERCEPTION_MODE": "REDIRECT",
+                    "ISTIO_META_INSTANCE_IPS": "10.44.2.55,10.44.2.55,fe80::d8e8:b6ff:fed6:f857",
+                    "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70",
+                    "ISTIO_PROXY_VERSION": "1.1.0",
+                    "ISTIO_VERSION": "1.0-dev",
+                    "POD_NAME": "productpage-v1-d66dcfdc5-kh6v7",
+                    "app": "productpage",
+                    "istio": "sidecar",
+                    "kubernetes.io/limit-ranger": "LimitRanger plugin set: cpu request for container istio-proxy; cpu request for container reviews",
+                    "pod-template-hash": "822879871",
+                    "version": "v1"
+                },
+                "locality": { },
+                "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL"
+            },
+            "logName": "als"
+        },
+        "httpLogs": {
+            "logEntry": [
+                {
+                    "commonProperties": {
+                        "downstreamRemoteAddress": {
+                            "socketAddress": {
+                                "address": "10.44.2.54",
+                                "portValue": 58996
+                            }
+                        },
+                        "downstreamLocalAddress": {
+                            "socketAddress": {
+                                "address": "10.47.247.180",
+                                "portValue": 9080
+                            }
+                        },
+                        "startTime": "2019-04-13T03:59:53.695750999Z",
+                        "timeToLastRxByte": "0.000082339s",
+                        "timeToFirstUpstreamTxByte": "0.002353100s",
+                        "timeToLastUpstreamTxByte": "0.002362295s",
+                        "timeToFirstUpstreamRxByte": "0.010500490s",
+                        "timeToLastUpstreamRxByte": "0.010735195s",
+                        "timeToFirstDownstreamTxByte": "0.010669993s",
+                        "timeToLastDownstreamTxByte": "0.010745496s",
+                        "upstreamRemoteAddress": {
+                            "socketAddress": {
+                                "address": "10.44.6.66",
+                                "portValue": 9080
+                            }
+                        },
+                        "upstreamCluster": "outbound|9080||details.default.svc.cluster.local"
+                    },
+                    "protocolVersion": "HTTP11",
+                    "request": {
+                        "requestMethod": "GET",
+                        "scheme": "http",
+                        "authority": "details:9080",
+                        "path": "/details/0",
+                        "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15",
+                        "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5",
+                        "requestHeadersBytes": "869"
+                    },
+                    "response": {
+                        "responseCode": 200,
+                        "responseHeadersBytes": "129",
+                        "responseBodyBytes": "178"
+                    }
+                }
+            ]
+        }
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-server-sidecar.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-server-sidecar.msg
new file mode 100644
index 0000000..d02ce2b
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-server-sidecar.msg
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+    "identifier": {
+        "node": {
+            "id": "sidecar~10.44.2.55~reviews-v1-d66dcfdc5-kh6v7.default~default.svc.cluster.local",
+            "cluster": "reviews.default",
+            "metadata": {
+                "CONFIG_NAMESPACE": "default",
+                "INTERCEPTION_MODE": "REDIRECT",
+                "ISTIO_META_INSTANCE_IPS": "10.44.2.55,10.44.2.55,fe80::d8e8:b6ff:fed6:f857",
+                "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70",
+                "ISTIO_PROXY_VERSION": "1.1.0",
+                "ISTIO_VERSION": "1.0-dev",
+                "POD_NAME": "reviews-v1-d66dcfdc5-kh6v7",
+                "app": "reviews",
+                "istio": "sidecar",
+                "kubernetes.io/limit-ranger": "LimitRanger plugin set: cpu request for container istio-proxy; cpu request for container reviews",
+                "pod-template-hash": "822879871",
+                "version": "v1"
+            },
+            "locality": { },
+            "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL"
+        },
+        "logName": "als"
+    },
+    "httpLogs": {
+        "logEntry": [
+            {
+                "commonProperties": {
+                    "downstreamRemoteAddress": {
+                        "socketAddress": {
+                            "address": "10.44.2.54",
+                            "portValue": 58356
+                        }
+                    },
+                    "downstreamLocalAddress": {
+                        "socketAddress": {
+                            "address": "10.44.2.55",
+                            "portValue": 9080
+                        }
+                    },
+                    "startTime": "2019-04-13T03:59:53.712690678Z",
+                    "timeToLastRxByte": "0.000127695s",
+                    "timeToFirstUpstreamTxByte": "0.000841545s",
+                    "timeToLastUpstreamTxByte": "0.000854020s",
+                    "timeToFirstUpstreamRxByte": "0.977617052s",
+                    "timeToLastUpstreamRxByte": "0.977797037s",
+                    "timeToFirstDownstreamTxByte": "0.977764621s",
+                    "timeToLastDownstreamTxByte": "0.977811534s",
+                    "upstreamRemoteAddress": {
+                        "socketAddress": {
+                            "address": "127.0.0.1",
+                            "portValue": 9080
+                        }
+                    },
+                    "upstreamCluster": "inbound|9080|http|reviews.default.svc.cluster.local",
+                    "metadata": {
+                        "filterMetadata": {
+                            "istio_authn": { }
+                        }
+                    }
+                },
+                "protocolVersion": "HTTP11",
+                "request": {
+                    "requestMethod": "GET",
+                    "scheme": "http",
+                    "authority": "reviews:9080",
+                    "path": "/reviews/0",
+                    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15",
+                    "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5",
+                    "requestHeadersBytes": "423"
+                },
+                "response": {
+                    "responseCode": 200,
+                    "responseHeadersBytes": "181",
+                    "responseBodyBytes": "295"
+                }
+            }
+        ]
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/address.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/address.proto
new file mode 100644
index 0000000..6e76f5b
--- /dev/null
+++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/address.proto
@@ -0,0 +1,121 @@
+syntax = "proto3";
+
+package envoy.api.v2.core;
+
+option java_outer_classname = "AddressProto";
+option java_multiple_files = true;
+option java_package = "io.envoyproxy.envoy.api.v2.core";
+
+import "envoy/api/v2/core/base.proto";
+
+import "google/protobuf/wrappers.proto";
+
+import "validate/validate.proto";
+import "gogoproto/gogo.proto";
+
+option (gogoproto.equal_all) = true;
+
+// [#protodoc-title: Network addresses]
+
+message Pipe {
+  // Unix Domain Socket path. On Linux, paths starting with '@' will use the
+  // abstract namespace. The starting '@' is replaced by a null byte by Envoy.
+  // Paths starting with '@' will result in an error in environments other than
+  // Linux.
+  string path = 1 [(validate.rules).string.min_bytes = 1];
+}
+
+message SocketAddress {
+  enum Protocol {
+    option (gogoproto.goproto_enum_prefix) = false;
+    TCP = 0;
+    // [#not-implemented-hide:]
+    UDP = 1;
+  }
+  Protocol protocol = 1 [(validate.rules).enum.defined_only = true];
+  // The address for this socket. :ref:`Listeners <config_listeners>` will bind
+  // to the address. An empty address is not allowed. Specify ``0.0.0.0`` or ``::``
+  // to bind to any address. [#comment:TODO(zuercher) reinstate when implemented:
+  // It is possible to distinguish a Listener address via the prefix/suffix matching
+  // in :ref:`FilterChainMatch <envoy_api_msg_listener.FilterChainMatch>`.] When used
+  // within an upstream :ref:`BindConfig <envoy_api_msg_core.BindConfig>`, the address
+  // controls the source address of outbound connections. For :ref:`clusters
+  // <envoy_api_msg_Cluster>`, the cluster type determines whether the
+  // address must be an IP (*STATIC* or *EDS* clusters) or a hostname resolved by DNS
+  // (*STRICT_DNS* or *LOGICAL_DNS* clusters). Address resolution can be customized
+  // via :ref:`resolver_name <envoy_api_field_core.SocketAddress.resolver_name>`.
+  string address = 2 [(validate.rules).string.min_bytes = 1];
+  oneof port_specifier {
+    option (validate.required) = true;
+    uint32 port_value = 3 [(validate.rules).uint32.lte = 65535];
+    // This is only valid if :ref:`resolver_name
+    // <envoy_api_field_core.SocketAddress.resolver_name>` is specified below and the
+    // named resolver is capable of named port resolution.
+    string named_port = 4;
+  }
+  // The name of the resolver. This must have been registered with Envoy. If this is
+  // empty, a context dependent default applies. If address is a hostname this
+  // should be set for resolution other than DNS. If the address is a concrete
+  // IP address, no resolution will occur.
+  string resolver_name = 5;
+
+  // When binding to an IPv6 address above, this enables `IPv4 compatibility
+  // <https://tools.ietf.org/html/rfc3493#page-11>`_. Binding to ``::`` will
+  // allow both IPv4 and IPv6 connections, with peer IPv4 addresses mapped into
+  // IPv6 space as ``::FFFF:<IPv4-address>``.
+  bool ipv4_compat = 6;
+}
+
+message TcpKeepalive {
+  // Maximum number of keepalive probes to send without response before deciding
+  // the connection is dead. Default is to use the OS level configuration (unless
+  // overridden, Linux defaults to 9.)
+  google.protobuf.UInt32Value keepalive_probes = 1;
+  // The number of seconds a connection needs to be idle before keep-alive probes
+  // start being sent. Default is to use the OS level configuration (unless
+  // overridden, Linux defaults to 7200s (ie 2 hours.)
+  google.protobuf.UInt32Value keepalive_time = 2;
+  // The number of seconds between keep-alive probes. Default is to use the OS
+  // level configuration (unless overridden, Linux defaults to 75s.)
+  google.protobuf.UInt32Value keepalive_interval = 3;
+}
+
+message BindConfig {
+  // The address to bind to when creating a socket.
+  SocketAddress source_address = 1
+      [(validate.rules).message.required = true, (gogoproto.nullable) = false];
+
+  // Whether to set the *IP_FREEBIND* option when creating the socket. When this
+  // flag is set to true, allows the :ref:`source_address
+  // <envoy_api_field_UpstreamBindConfig.source_address>` to be an IP address
+  // that is not configured on the system running Envoy. When this flag is set
+  // to false, the option *IP_FREEBIND* is disabled on the socket. When this
+  // flag is not set (default), the socket is not modified, i.e. the option is
+  // neither enabled nor disabled.
+  google.protobuf.BoolValue freebind = 2;
+
+  // Additional socket options that may not be present in Envoy source code or
+  // precompiled binaries.
+  repeated SocketOption socket_options = 3;
+}
+
+// Addresses specify either a logical or physical address and port, which are
+// used to tell Envoy where to bind/listen, connect to upstream and find
+// management servers.
+message Address {
+  oneof address {
+    option (validate.required) = true;
+
+    SocketAddress socket_address = 1;
+    Pipe pipe = 2;
+  }
+}
+
+// CidrRange specifies an IP Address and a prefix length to construct
+// the subnet mask for a `CIDR <https://tools.ietf.org/html/rfc4632>`_ range.
+message CidrRange {
+  // IPv4 or IPv6 address, e.g. ``192.0.0.0`` or ``2001:db8::``.
+  string address_prefix = 1 [(validate.rules).string.min_bytes = 1];
+  // Length of prefix, e.g. 0, 32.
+  google.protobuf.UInt32Value prefix_len = 2 [(validate.rules).uint32.lte = 128];
+}
diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto
index 3be09cd..6b4e931 100644
--- a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto
+++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto
@@ -17,6 +17,7 @@ import "gogoproto/gogo.proto";
 import "envoy/type/percent.proto";
 
 option (gogoproto.equal_all) = true;
+option (gogoproto.stable_marshaler_all) = true;
 
 // [#protodoc-title: Common types]
 
@@ -78,3 +79,178 @@ message Node {
   // by Envoy in management server RPCs.
   string build_version = 5;
 }
+
+// Metadata provides additional inputs to filters based on matched listeners,
+// filter chains, routes and endpoints. It is structured as a map, usually from
+// filter name (in reverse DNS format) to metadata specific to the filter. Metadata
+// key-values for a filter are merged as connection and request handling occurs,
+// with later values for the same key overriding earlier values.
+//
+// An example use of metadata is providing additional values to
+// http_connection_manager in the envoy.http_connection_manager.access_log
+// namespace.
+//
+// Another example use of metadata is to per service config info in cluster metadata, which may get
+// consumed by multiple filters.
+//
+// For load balancing, Metadata provides a means to subset cluster endpoints.
+// Endpoints have a Metadata object associated and routes contain a Metadata
+// object to match against. There are some well defined metadata used today for
+// this purpose:
+//
+// * ``{"envoy.lb": {"canary": <bool> }}`` This indicates the canary status of an
+//   endpoint and is also used during header processing
+//   (x-envoy-upstream-canary) and for stats purposes.
+message Metadata {
+  // Key is the reverse DNS filter name, e.g. com.acme.widget. The envoy.*
+  // namespace is reserved for Envoy's built-in filters.
+  map<string, google.protobuf.Struct> filter_metadata = 1;
+}
+
+// Runtime derived uint32 with a default when not specified.
+message RuntimeUInt32 {
+  // Default value if runtime value is not available.
+  uint32 default_value = 2;
+
+  // Runtime key to get value for comparison. This value is used if defined.
+  string runtime_key = 3 [(validate.rules).string.min_bytes = 1];
+}
+
+// Envoy supports :ref:`upstream priority routing
+// <arch_overview_http_routing_priority>` both at the route and the virtual
+// cluster level. The current priority implementation uses different connection
+// pool and circuit breaking settings for each priority level. This means that
+// even for HTTP/2 requests, two physical connections will be used to an
+// upstream host. In the future Envoy will likely support true HTTP/2 priority
+// over a single upstream connection.
+enum RoutingPriority {
+  DEFAULT = 0;
+  HIGH = 1;
+}
+
+// HTTP request method.
+enum RequestMethod {
+  option (gogoproto.goproto_enum_prefix) = false;
+  METHOD_UNSPECIFIED = 0;
+  GET = 1;
+  HEAD = 2;
+  POST = 3;
+  PUT = 4;
+  DELETE = 5;
+  CONNECT = 6;
+  OPTIONS = 7;
+  TRACE = 8;
+}
+
+// Header name/value pair.
+message HeaderValue {
+  // Header name.
+  string key = 1 [(validate.rules).string = {min_bytes: 1, max_bytes: 16384}];
+
+  // Header value.
+  //
+  // The same :ref:`format specifier <config_access_log_format>` as used for
+  // :ref:`HTTP access logging <config_access_log>` applies here, however
+  // unknown header values are replaced with the empty string instead of `-`.
+  string value = 2 [(validate.rules).string.max_bytes = 16384];
+}
+
+// Header name/value pair plus option to control append behavior.
+message HeaderValueOption {
+  // Header name/value pair that this option applies to.
+  HeaderValue header = 1 [(validate.rules).message.required = true];
+
+  // Should the value be appended? If true (default), the value is appended to
+  // existing values.
+  google.protobuf.BoolValue append = 2;
+}
+
+// Wrapper for a set of headers.
+message HeaderMap {
+  repeated HeaderValue headers = 1;
+}
+
+// Data source consisting of either a file or an inline value.
+message DataSource {
+  oneof specifier {
+    option (validate.required) = true;
+
+    // Local filesystem data source.
+    string filename = 1 [(validate.rules).string.min_bytes = 1];
+
+    // Bytes inlined in the configuration.
+    bytes inline_bytes = 2 [(validate.rules).bytes.min_len = 1];
+
+    // String inlined in the configuration.
+    string inline_string = 3 [(validate.rules).string.min_bytes = 1];
+  }
+}
+
+// Configuration for transport socket in :ref:`listeners <config_listeners>` and
+// :ref:`clusters <envoy_api_msg_Cluster>`. If the configuration is
+// empty, a default transport socket implementation and configuration will be
+// chosen based on the platform and existence of tls_context.
+message TransportSocket {
+  // The name of the transport socket to instantiate. The name must match a supported transport
+  // socket implementation.
+  string name = 1 [(validate.rules).string.min_bytes = 1];
+
+  // Implementation specific configuration which depends on the implementation being instantiated.
+  // See the supported transport socket implementations for further documentation.
+  oneof config_type {
+    google.protobuf.Struct config = 2;
+
+    google.protobuf.Any typed_config = 3;
+  }
+}
+
+// Generic socket option message. This would be used to set socket options that
+// might not exist in upstream kernels or precompiled Envoy binaries.
+message SocketOption {
+  // An optional name to give this socket option for debugging, etc.
+  // Uniqueness is not required and no special meaning is assumed.
+  string description = 1;
+  // Corresponding to the level value passed to setsockopt, such as IPPROTO_TCP
+  int64 level = 2;
+  // The numeric name as passed to setsockopt
+  int64 name = 3;
+  oneof value {
+    option (validate.required) = true;
+
+    // Because many sockopts take an int value.
+    int64 int_value = 4;
+    // Otherwise it's a byte buffer.
+    bytes buf_value = 5;
+  }
+  enum SocketState {
+    option (gogoproto.goproto_enum_prefix) = false;
+    // Socket options are applied after socket creation but before binding the socket to a port
+    STATE_PREBIND = 0;
+    // Socket options are applied after binding the socket to a port but before calling listen()
+    STATE_BOUND = 1;
+    // Socket options are applied after calling listen()
+    STATE_LISTENING = 2;
+  }
+  // The state in which the option will be applied. When used in BindConfig
+  // STATE_PREBIND is currently the only valid value.
+  SocketState state = 6
+      [(validate.rules).message.required = true, (validate.rules).enum.defined_only = true];
+}
+
+// Runtime derived FractionalPercent with defaults for when the numerator or denominator is not
+// specified via a runtime key.
+message RuntimeFractionalPercent {
+  // Default value if the runtime value's for the numerator/denominator keys are not available.
+  envoy.type.FractionalPercent default_value = 1 [(validate.rules).message.required = true];
+
+  // Runtime key for a YAML representation of a FractionalPercent.
+  string runtime_key = 2;
+}
+
+// Identifies a specific ControlPlane instance that Envoy is connected to.
+message ControlPlane {
+  // An opaque control plane identifier that uniquely identifies an instance
+  // of control plane. This can be used to identify which control plane instance,
+  // the Envoy is connected to.
+  string identifier = 1;
+}
diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/data/accesslog/v2/accesslog.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/data/accesslog/v2/accesslog.proto
new file mode 100644
index 0000000..b387433
--- /dev/null
+++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/data/accesslog/v2/accesslog.proto
@@ -0,0 +1,335 @@
+syntax = "proto3";
+
+package envoy.data.accesslog.v2;
+
+option java_outer_classname = "AccesslogProto";
+option java_multiple_files = true;
+option java_package = "io.envoyproxy.envoy.data.accesslog.v2";
+
+import "envoy/api/v2/core/address.proto";
+import "envoy/api/v2/core/base.proto";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/wrappers.proto";
+import "gogoproto/gogo.proto";
+import "validate/validate.proto";
+
+option (gogoproto.stable_marshaler_all) = true;
+
+// [#protodoc-title: gRPC access logs]
+// Envoy access logs describe incoming interaction with Envoy over a fixed
+// period of time, and typically cover a single request/response exchange,
+// (e.g. HTTP), stream (e.g. over HTTP/gRPC), or proxied connection (e.g. TCP).
+// Access logs contain fields defined in protocol-specific protobuf messages.
+//
+// Except where explicitly declared otherwise, all fields describe
+// *downstream* interaction between Envoy and a connected client.
+// Fields describing *upstream* interaction will explicitly include ``upstream``
+// in their name.
+
+// [#not-implemented-hide:]
+message TCPAccessLogEntry {
+  // Common properties shared by all Envoy access logs.
+  AccessLogCommon common_properties = 1;
+}
+
+message HTTPAccessLogEntry {
+  // Common properties shared by all Envoy access logs.
+  AccessLogCommon common_properties = 1;
+
+  // HTTP version
+  enum HTTPVersion {
+    PROTOCOL_UNSPECIFIED = 0;
+    HTTP10 = 1;
+    HTTP11 = 2;
+    HTTP2 = 3;
+  }
+  HTTPVersion protocol_version = 2;
+
+  // Description of the incoming HTTP request.
+  HTTPRequestProperties request = 3;
+
+  // Description of the outgoing HTTP response.
+  HTTPResponseProperties response = 4;
+}
+
+// Defines fields that are shared by all Envoy access logs.
+message AccessLogCommon {
+  // [#not-implemented-hide:]
+  // This field indicates the rate at which this log entry was sampled.
+  // Valid range is (0.0, 1.0].
+  double sample_rate = 1 [(validate.rules).double.gt = 0.0, (validate.rules).double.lte = 1.0];
+
+  // This field is the remote/origin address on which the request from the user was received.
+  // Note: This may not be the physical peer. E.g, if the remote address is inferred from for
+  // example the x-forwarder-for header, proxy protocol, etc.
+  envoy.api.v2.core.Address downstream_remote_address = 2;
+
+  // This field is the local/destination address on which the request from the user was received.
+  envoy.api.v2.core.Address downstream_local_address = 3;
+
+  // If the connection is secure,S this field will contain TLS properties.
+  TLSProperties tls_properties = 4;
+
+  // The time that Envoy started servicing this request. This is effectively the time that the first
+  // downstream byte is received.
+  google.protobuf.Timestamp start_time = 5 [(gogoproto.stdtime) = true];
+
+  // Interval between the first downstream byte received and the last
+  // downstream byte received (i.e. time it takes to receive a request).
+  google.protobuf.Duration time_to_last_rx_byte = 6 [(gogoproto.stdduration) = true];
+
+  // Interval between the first downstream byte received and the first upstream byte sent. There may
+  // by considerable delta between *time_to_last_rx_byte* and this value due to filters.
+  // Additionally, the same caveats apply as documented in *time_to_last_downstream_tx_byte* about
+  // not accounting for kernel socket buffer time, etc.
+  google.protobuf.Duration time_to_first_upstream_tx_byte = 7 [(gogoproto.stdduration) = true];
+
+  // Interval between the first downstream byte received and the last upstream byte sent. There may
+  // by considerable delta between *time_to_last_rx_byte* and this value due to filters.
+  // Additionally, the same caveats apply as documented in *time_to_last_downstream_tx_byte* about
+  // not accounting for kernel socket buffer time, etc.
+  google.protobuf.Duration time_to_last_upstream_tx_byte = 8 [(gogoproto.stdduration) = true];
+
+  // Interval between the first downstream byte received and the first upstream
+  // byte received (i.e. time it takes to start receiving a response).
+  google.protobuf.Duration time_to_first_upstream_rx_byte = 9 [(gogoproto.stdduration) = true];
+
+  // Interval between the first downstream byte received and the last upstream
+  // byte received (i.e. time it takes to receive a complete response).
+  google.protobuf.Duration time_to_last_upstream_rx_byte = 10 [(gogoproto.stdduration) = true];
+
+  // Interval between the first downstream byte received and the first downstream byte sent.
+  // There may be a considerable delta between the *time_to_first_upstream_rx_byte* and this field
+  // due to filters. Additionally, the same caveats apply as documented in
+  // *time_to_last_downstream_tx_byte* about not accounting for kernel socket buffer time, etc.
+  google.protobuf.Duration time_to_first_downstream_tx_byte = 11 [(gogoproto.stdduration) = true];
+
+  // Interval between the first downstream byte received and the last downstream byte sent.
+  // Depending on protocol, buffering, windowing, filters, etc. there may be a considerable delta
+  // between *time_to_last_upstream_rx_byte* and this field. Note also that this is an approximate
+  // time. In the current implementation it does not include kernel socket buffer time. In the
+  // current implementation it also does not include send window buffering inside the HTTP/2 codec.
+  // In the future it is likely that work will be done to make this duration more accurate.
+  google.protobuf.Duration time_to_last_downstream_tx_byte = 12 [(gogoproto.stdduration) = true];
+
+  // The upstream remote/destination address that handles this exchange. This does not include
+  // retries.
+  envoy.api.v2.core.Address upstream_remote_address = 13;
+
+  // The upstream local/origin address that handles this exchange. This does not include retries.
+  envoy.api.v2.core.Address upstream_local_address = 14;
+
+  // The upstream cluster that *upstream_remote_address* belongs to.
+  string upstream_cluster = 15;
+
+  // Flags indicating occurrences during request/response processing.
+  ResponseFlags response_flags = 16;
+
+  // All metadata encountered during request processing, including endpoint
+  // selection.
+  //
+  // This can be used to associate IDs attached to the various configurations
+  // used to process this request with the access log entry. For example, a
+  // route created from a higher level forwarding rule with some ID can place
+  // that ID in this field and cross reference later. It can also be used to
+  // determine if a canary endpoint was used or not.
+  envoy.api.v2.core.Metadata metadata = 17;
+
+  // If upstream connection failed due to transport socket (e.g. TLS handshake), provides the
+  // failure reason from the transport socket. The format of this field depends on the configured
+  // upstream transport socket. Common TLS failures are in
+  // :ref:`TLS trouble shooting <arch_overview_ssl_trouble_shooting>`.
+  string upstream_transport_failure_reason = 18;
+}
+
+// Flags indicating occurrences during request/response processing.
+message ResponseFlags {
+  // Indicates local server healthcheck failed.
+  bool failed_local_healthcheck = 1;
+
+  // Indicates there was no healthy upstream.
+  bool no_healthy_upstream = 2;
+
+  // Indicates an there was an upstream request timeout.
+  bool upstream_request_timeout = 3;
+
+  // Indicates local codec level reset was sent on the stream.
+  bool local_reset = 4;
+
+  // Indicates remote codec level reset was received on the stream.
+  bool upstream_remote_reset = 5;
+
+  // Indicates there was a local reset by a connection pool due to an initial connection failure.
+  bool upstream_connection_failure = 6;
+
+  // Indicates the stream was reset due to an upstream connection termination.
+  bool upstream_connection_termination = 7;
+
+  // Indicates the stream was reset because of a resource overflow.
+  bool upstream_overflow = 8;
+
+  // Indicates no route was found for the request.
+  bool no_route_found = 9;
+
+  // Indicates that the request was delayed before proxying.
+  bool delay_injected = 10;
+
+  // Indicates that the request was aborted with an injected error code.
+  bool fault_injected = 11;
+
+  // Indicates that the request was rate-limited locally.
+  bool rate_limited = 12;
+
+  message Unauthorized {
+    // Reasons why the request was unauthorized
+    enum Reason {
+      REASON_UNSPECIFIED = 0;
+      // The request was denied by the external authorization service.
+      EXTERNAL_SERVICE = 1;
+    }
+
+    Reason reason = 1;
+  }
+
+  // Indicates if the request was deemed unauthorized and the reason for it.
+  Unauthorized unauthorized_details = 13;
+
+  // Indicates that the request was rejected because there was an error in rate limit service.
+  bool rate_limit_service_error = 14;
+
+  // Indicates the stream was reset due to a downstream connection termination.
+  bool downstream_connection_termination = 15;
+
+  // Indicates that the upstream retry limit was exceeded, resulting in a downstream error.
+  bool upstream_retry_limit_exceeded = 16;
+
+  // Indicates that the stream idle timeout was hit, resulting in a downstream 408.
+  bool stream_idle_timeout = 17;
+}
+
+// Properties of a negotiated TLS connection.
+message TLSProperties {
+  // [#not-implemented-hide:]
+  enum TLSVersion {
+    VERSION_UNSPECIFIED = 0;
+    TLSv1 = 1;
+    TLSv1_1 = 2;
+    TLSv1_2 = 3;
+    TLSv1_3 = 4;
+  }
+  // [#not-implemented-hide:]
+  // Version of TLS that was negotiated.
+  TLSVersion tls_version = 1;
+
+  // [#not-implemented-hide:]
+  // TLS cipher suite negotiated during handshake. The value is a
+  // four-digit hex code defined by the IANA TLS Cipher Suite Registry
+  // (e.g. ``009C`` for ``TLS_RSA_WITH_AES_128_GCM_SHA256``).
+  //
+  // Here it is expressed as an integer.
+  google.protobuf.UInt32Value tls_cipher_suite = 2;
+
+  // SNI hostname from handshake.
+  string tls_sni_hostname = 3;
+
+  message CertificateProperties {
+    message SubjectAltName {
+      oneof san {
+        string uri = 1;
+        // [#not-implemented-hide:]
+        string dns = 2;
+      }
+    }
+
+    // SANs present in the certificate.
+    repeated SubjectAltName subject_alt_name = 1;
+
+    // The subject field of the certificate.
+    string subject = 2;
+  }
+
+  // Properties of the local certificate used to negotiate TLS.
+  CertificateProperties local_certificate_properties = 4;
+
+  // Properties of the peer certificate used to negotiate TLS.
+  CertificateProperties peer_certificate_properties = 5;
+}
+
+message HTTPRequestProperties {
+  // The request method (RFC 7231/2616).
+  // [#comment:TODO(htuch): add (validate.rules).enum.defined_only = true once
+  // https://github.com/lyft/protoc-gen-validate/issues/42 is resolved.]
+  envoy.api.v2.core.RequestMethod request_method = 1;
+
+  // The scheme portion of the incoming request URI.
+  string scheme = 2;
+
+  // HTTP/2 ``:authority`` or HTTP/1.1 ``Host`` header value.
+  string authority = 3;
+
+  // The port of the incoming request URI
+  // (unused currently, as port is composed onto authority).
+  google.protobuf.UInt32Value port = 4;
+
+  // The path portion from the incoming request URI.
+  string path = 5;
+
+  // Value of the ``User-Agent`` request header.
+  string user_agent = 6;
+
+  // Value of the ``Referer`` request header.
+  string referer = 7;
+
+  // Value of the ``X-Forwarded-For`` request header.
+  string forwarded_for = 8;
+
+  // Value of the ``X-Request-Id`` request header
+  //
+  // This header is used by Envoy to uniquely identify a request.
+  // It will be generated for all external requests and internal requests that
+  // do not already have a request ID.
+  string request_id = 9;
+
+  // Value of the ``X-Envoy-Original-Path`` request header.
+  string original_path = 10;
+
+  // Size of the HTTP request headers in bytes.
+  //
+  // This value is captured from the OSI layer 7 perspective, i.e. it does not
+  // include overhead from framing or encoding at other networking layers.
+  uint64 request_headers_bytes = 11;
+
+  // Size of the HTTP request body in bytes.
+  //
+  // This value is captured from the OSI layer 7 perspective, i.e. it does not
+  // include overhead from framing or encoding at other networking layers.
+  uint64 request_body_bytes = 12;
+
+  // Map of additional headers that have been configured to be logged.
+  map<string, string> request_headers = 13;
+}
+
+message HTTPResponseProperties {
+  // The HTTP response code returned by Envoy.
+  google.protobuf.UInt32Value response_code = 1;
+
+  // Size of the HTTP response headers in bytes.
+  //
+  // This value is captured from the OSI layer 7 perspective, i.e. it does not
+  // include overhead from framing or encoding at other networking layers.
+  uint64 response_headers_bytes = 2;
+
+  // Size of the HTTP response body in bytes.
+  //
+  // This value is captured from the OSI layer 7 perspective, i.e. it does not
+  // include overhead from framing or encoding at other networking layers.
+  uint64 response_body_bytes = 3;
+
+  // Map of additional headers configured to be logged.
+  map<string, string> response_headers = 4;
+
+  // Map of trailers configured to be logged.
+  map<string, string> response_trailers = 5;
+}
diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/service/accesslog/v2/als.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/service/accesslog/v2/als.proto
new file mode 100644
index 0000000..1ee6ccd
--- /dev/null
+++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/service/accesslog/v2/als.proto
@@ -0,0 +1,73 @@
+syntax = "proto3";
+
+package envoy.service.accesslog.v2;
+
+option java_outer_classname = "AlsProto";
+option java_multiple_files = true;
+option java_package = "io.envoyproxy.envoy.service.accesslog.v2";
+option go_package = "v2";
+option java_generic_services = true;
+
+import "envoy/api/v2/core/base.proto";
+import "envoy/data/accesslog/v2/accesslog.proto";
+
+import "validate/validate.proto";
+
+// [#protodoc-title: gRPC Access Log Service (ALS)]
+
+// Service for streaming access logs from Envoy to an access log server.
+service AccessLogService {
+  // Envoy will connect and send StreamAccessLogsMessage messages forever. It does not expect any
+  // response to be sent as nothing would be done in the case of failure. The server should
+  // disconnect if it expects Envoy to reconnect. In the future we may decide to add a different
+  // API for "critical" access logs in which Envoy will buffer access logs for some period of time
+  // until it gets an ACK so it could then retry. This API is designed for high throughput with the
+  // expectation that it might be lossy.
+  rpc StreamAccessLogs(stream StreamAccessLogsMessage) returns (StreamAccessLogsResponse) {
+  }
+}
+
+// Empty response for the StreamAccessLogs API. Will never be sent. See below.
+message StreamAccessLogsResponse {
+}
+
+// Stream message for the StreamAccessLogs API. Envoy will open a stream to the server and stream
+// access logs without ever expecting a response.
+message StreamAccessLogsMessage {
+  message Identifier {
+    // The node sending the access log messages over the stream.
+    envoy.api.v2.core.Node node = 1 [(validate.rules).message.required = true];
+
+    // The friendly name of the log configured in :ref:`CommonGrpcAccessLogConfig
+    // <envoy_api_msg_config.accesslog.v2.CommonGrpcAccessLogConfig>`.
+    string log_name = 2 [(validate.rules).string.min_bytes = 1];
+  }
+
+  // Identifier data that will only be sent in the first message on the stream. This is effectively
+  // structured metadata and is a performance optimization.
+  Identifier identifier = 1;
+
+  // Wrapper for batches of HTTP access log entries.
+  message HTTPAccessLogEntries {
+    repeated envoy.data.accesslog.v2.HTTPAccessLogEntry log_entry = 1
+        [(validate.rules).repeated .min_items = 1];
+  }
+
+  // [#not-implemented-hide:]
+  // Wrapper for batches of TCP access log entries.
+  message TCPAccessLogEntries {
+    repeated envoy.data.accesslog.v2.TCPAccessLogEntry log_entry = 1
+        [(validate.rules).repeated .min_items = 1];
+  }
+
+  // Batches of log entries of a single type. Generally speaking, a given stream should only
+  // ever include one type of log entry.
+  oneof log_entries {
+    option (validate.required) = true;
+
+    HTTPAccessLogEntries http_logs = 2;
+
+    // [#not-implemented-hide:]
+    TCPAccessLogEntries tcp_logs = 3;
+  }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
index 6d3be3b..f3ea308 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.aop.server.receiver.mesh;
 import com.google.gson.JsonObject;
 import org.apache.skywalking.apm.network.common.DetectPoint;
 import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
 import org.apache.skywalking.oap.server.receiver.sharing.server.CoreRegisterLinker;
@@ -46,12 +47,18 @@ public class ServiceMeshMetricDataDecorator {
         boolean isRegistered = true;
         sourceServiceId = origin.getSourceServiceId();
         if (sourceServiceId == Const.NONE) {
-            sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(origin.getSourceServiceName(), null);
-            if (sourceServiceId != Const.NONE) {
-                getNewDataBuilder().setSourceServiceId(sourceServiceId);
-            } else {
-                isRegistered = false;
+            String sourceServiceName = origin.getSourceServiceName();
+            // sourceServiceName is optional now,
+            // which means only generate dest service traffic.
+            if (!StringUtil.isEmpty(sourceServiceName)) {
+                sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(sourceServiceName, null);
+                if (sourceServiceId != Const.NONE) {
+                    getNewDataBuilder().setSourceServiceId(sourceServiceId);
+                } else {
+                    isRegistered = false;
+                }
             }
+            // No service name, service instance will be ignored too.
         }
         sourceServiceInstanceId = origin.getSourceServiceInstanceId();
         if (sourceServiceId != Const.NONE && sourceServiceInstanceId == Const.NONE) {
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index dbe069b..fe2bb43 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -98,8 +98,13 @@ public class TelemetryDataDispatcher {
             toServiceInstance(decorator, minuteTimeBucket);
             toEndpoint(decorator, minuteTimeBucket);
         }
-        toServiceRelation(decorator, minuteTimeBucket);
-        toServiceInstanceRelation(decorator, minuteTimeBucket);
+
+        int sourceServiceId = metrics.getSourceServiceId();
+        // Don't generate relation, if no source.
+        if (sourceServiceId != Const.NONE) {
+            toServiceRelation(decorator, minuteTimeBucket);
+            toServiceInstanceRelation(decorator, minuteTimeBucket);
+        }
     }
 
     private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
@@ -108,20 +113,23 @@ public class TelemetryDataDispatcher {
         int heartbeatCycle = 10000;
         // source
         int instanceId = metrics.getSourceServiceInstanceId();
-        ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
-        if (Objects.nonNull(serviceInstanceInventory)) {
-            if (metrics.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
-                // trigger heartbeat every 10s.
-                SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metrics.getSourceServiceInstanceId(), metrics.getEndTime());
-                SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metrics.getEndTime());
+        // Don't generate source heartbeat, if no source.
+        if (instanceId != Const.NONE) {
+            ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+            if (Objects.nonNull(serviceInstanceInventory)) {
+                if (metrics.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
+                    // trigger heartbeat every 10s.
+                    SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metrics.getSourceServiceInstanceId(), metrics.getEndTime());
+                    SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metrics.getEndTime());
+                }
+            } else {
+                logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
             }
-        } else {
-            logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
         }
 
         // dest
         instanceId = metrics.getDestServiceInstanceId();
-        serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+        ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
             if (metrics.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
                 // trigger heartbeat every 10s.
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 05edfc0..708b1a6 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -105,6 +105,7 @@ istio-telemetry:
   default:
 envoy-metric:
   default:
+#    alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh}
 #receiver_zipkin:
 #  default:
 #    host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index ced2d43..b70ef71 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -105,6 +105,7 @@ istio-telemetry:
   default:
 envoy-metric:
   default:
+#    alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh}
 #receiver_zipkin:
 #  default:
 #    host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}