You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/12/28 07:29:53 UTC

[skywalking] 01/01: Enhance Envoy metrics service analyzer by MAL

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch envoy/mal
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 03fb2b456f3c34803f3357f163b53530572383a5
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Mon Dec 28 15:29:28 2020 +0800

    Enhance Envoy metrics service analyzer by MAL
---
 docs/en/concepts-and-designs/mal.md                |   8 +-
 .../envoy/examples/metrics/docker-compose.yaml     |   2 +-
 .../oap/meter/analyzer/dsl/SampleFamily.java       |  25 ++++-
 .../prometheus/PrometheusMetricConverter.java      |  23 +++--
 .../src/main/resources/application.yml             |   1 +
 .../main/resources/envoy-metrics-rules/envoy.yaml  |  42 +++++++++
 .../src/main/resources/oal/envoy.oal               |  22 -----
 .../otel-oc-rules/istio-controlplane.yaml          |   2 +-
 .../resources/ui-initialized-templates/istio.yml   |  31 ++++++-
 .../envoy-metrics-receiver-plugin/pom.xml          |   5 +
 .../receiver/envoy/EnvoyMetricReceiverConfig.java  |  11 +++
 .../envoy/EnvoyMetricReceiverProvider.java         |   2 +-
 .../receiver/envoy/MetricServiceGRPCHandler.java   | 102 +++++++++------------
 .../adapters/ProtoMetricFamily2MetricsAdapter.java |  85 +++++++++++++++++
 14 files changed, 258 insertions(+), 103 deletions(-)

diff --git a/docs/en/concepts-and-designs/mal.md b/docs/en/concepts-and-designs/mal.md
index 30834af..b1edc70 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -59,7 +59,7 @@ Between two scalars: they evaluate to another scalar that is the result of the o
 1 + 2
 ```
 
-Between a sample family and a scalar, the operator is applied to the value of every sample in the smaple family. For example:
+Between a sample family and a scalar, the operator is applied to the value of every sample in the sample family. For example:
 
 ```
 instance_trace_count + 2
@@ -110,9 +110,9 @@ Sample family supports the following aggregation operations that can be used to
 resulting in a new sample family of fewer samples(even single one) with aggregated values:
 
  - sum (calculate sum over dimensions)
- - min (select minimum over dimensions) (TODO)
- - max (select maximum over dimensions) (TODO)
- - avg (calculate the average over dimensions) (TODO)
+ - min (select minimum over dimensions)
+ - max (select maximum over dimensions)
+ - avg (calculate the average over dimensions)
  
 These operations can be used to aggregate over all label dimensions or preserve distinct dimensions by inputting `by` parameter. 
 
diff --git a/docs/en/setup/envoy/examples/metrics/docker-compose.yaml b/docs/en/setup/envoy/examples/metrics/docker-compose.yaml
index 2250d52..6f069da 100644
--- a/docs/en/setup/envoy/examples/metrics/docker-compose.yaml
+++ b/docs/en/setup/envoy/examples/metrics/docker-compose.yaml
@@ -17,7 +17,7 @@
 version: "3"
 services:
   envoy:
-    image: envoyproxy/envoy-alpine:latest
+    image: envoyproxy/envoy-alpine:v1.16.2
     command: /usr/local/bin/envoy -c /etc/envoy.yaml --service-cluster envoy-proxy
     ports:
     - 10000:10000
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index 24e9fb4..c1115d1 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -28,6 +28,7 @@ import groovy.lang.Closure;
 import io.vavr.Function2;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
+import java.util.function.DoubleBinaryOperator;
 import lombok.AccessLevel;
 import lombok.Builder;
 import lombok.EqualsAndHashCode;
@@ -161,12 +162,32 @@ public class SampleFamily {
 
     /* Aggregation operators */
     public SampleFamily sum(List<String> by) {
+        return aggregate(by, Double::sum);
+    }
+
+    public SampleFamily max(List<String> by) {
+        return aggregate(by, Double::max);
+    }
+
+    public SampleFamily min(List<String> by) {
+        return aggregate(by, Double::min);
+    }
+
+    public SampleFamily avg(List<String> by) {
+        final SampleFamily summation = aggregate(by, Double::sum);
+        for (int i = 0; i < summation.samples.length; i++) {
+            summation.samples[i] = summation.samples[i].newValue(s -> s / summation.samples.length);
+        }
+        return summation;
+    }
+
+    protected SampleFamily aggregate(List<String> by, DoubleBinaryOperator aggregator) {
         ExpressionParsingContext.get().ifPresent(ctx -> ctx.aggregationLabels.addAll(by));
         if (this == EMPTY) {
             return EMPTY;
         }
         if (by == null) {
-            double result = Arrays.stream(samples).mapToDouble(s -> s.value).reduce(Double::sum).orElse(0.0D);
+            double result = Arrays.stream(samples).mapToDouble(s -> s.value).reduce(aggregator).orElse(0.0D);
             return SampleFamily.build(this.context, newSample(ImmutableMap.of(), samples[0].timestamp, result));
         }
         return SampleFamily.build(this.context, Arrays.stream(samples)
@@ -176,7 +197,7 @@ public class SampleFamily {
             .collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())))
             .entrySet().stream()
             .map(entry -> newSample(entry.getKey(), entry.getValue().get(0).timestamp, entry.getValue().stream()
-                .mapToDouble(s -> s.value).reduce(Double::sum).orElse(0.0D)))
+                .mapToDouble(s -> s.value).reduce(aggregator).orElse(0.0D)))
             .toArray(Sample[]::new));
     }
 
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java
index 47c5e89..9af3960 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java
@@ -86,15 +86,15 @@ public class PrometheusMetricConverter {
     private Stream<Tuple2<String, SampleFamily>> convertMetric(Metric metric) {
         return Match(metric).of(
             Case($(instanceOf(Histogram.class)), t -> Stream.of(
-                Tuple.of(metric.getName() + "_count", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_count")
+                Tuple.of(escapedName(metric.getName() + "_count"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_count"))
                     .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Histogram) metric).getSampleCount()).build()).build()),
-                Tuple.of(metric.getName() + "_sum", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_sum")
+                Tuple.of(escapedName(metric.getName() + "_sum"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_sum"))
                     .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Histogram) metric).getSampleSum()).build()).build()),
                 convertToSample(metric).orElse(NIL))),
             Case($(instanceOf(Summary.class)), t -> Stream.of(
-                Tuple.of(metric.getName() + "_count", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_count")
+                Tuple.of(escapedName(metric.getName() + "_count"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_count"))
                     .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Summary) metric).getSampleCount()).build()).build()),
-                Tuple.of(metric.getName() + "_sum", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_sum")
+                Tuple.of(escapedName(metric.getName() + "_sum"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_sum"))
                     .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Summary) metric).getSampleSum()).build()).build()),
                 convertToSample(metric).orElse(NIL))),
             Case($(), t -> Stream.of(convertToSample(metric).orElse(NIL)))
@@ -104,13 +104,13 @@ public class PrometheusMetricConverter {
     private Optional<Tuple2<String, SampleFamily>> convertToSample(Metric metric) {
         Sample[] ss = Match(metric).of(
             Case($(instanceOf(Counter.class)), t -> Collections.singletonList(Sample.builder()
-                .name(t.getName())
+                .name(escapedName(t.getName()))
                 .labels(ImmutableMap.copyOf(t.getLabels()))
                 .timestamp(t.getTimestamp())
                 .value(t.getValue())
                 .build())),
             Case($(instanceOf(Gauge.class)), t -> Collections.singletonList(Sample.builder()
-                .name(t.getName())
+                .name(escapedName(t.getName()))
                 .labels(ImmutableMap.copyOf(t.getLabels()))
                 .timestamp(t.getTimestamp())
                 .value(t.getValue())
@@ -118,7 +118,7 @@ public class PrometheusMetricConverter {
             Case($(instanceOf(Histogram.class)), t -> t.getBuckets()
                 .entrySet().stream()
                 .map(b -> Sample.builder()
-                    .name(t.getName())
+                    .name(escapedName(t.getName()))
                     .labels(ImmutableMap.<String, String>builder()
                         .putAll(t.getLabels())
                         .put("le", b.getKey().toString())
@@ -129,7 +129,7 @@ public class PrometheusMetricConverter {
             Case($(instanceOf(Summary.class)),
                 t -> t.getQuantiles().entrySet().stream()
                     .map(b -> Sample.builder()
-                        .name(t.getName())
+                        .name(escapedName(t.getName()))
                         .labels(ImmutableMap.<String, String>builder()
                             .putAll(t.getLabels())
                             .put("quantile", b.getKey().toString())
@@ -141,6 +141,11 @@ public class PrometheusMetricConverter {
         if (ss.length < 1) {
             return Optional.empty();
         }
-        return Optional.of(Tuple.of(metric.getName(), SampleFamilyBuilder.newBuilder(ss).build()));
+        return Optional.of(Tuple.of(escapedName(metric.getName()), SampleFamilyBuilder.newBuilder(ss).build()));
+    }
+
+    // Returns the escaped name of the given one, with "." replaced by "_"
+    protected String escapedName(final String name) {
+        return name.replaceAll("\\.", "_");
     }
 }
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 04edb32..5fbadb0 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -266,6 +266,7 @@ envoy-metric:
     # to append the version number to the service name.
     # Be careful, when using environment variables to pass this configuration, use single quotes(`''`) to avoid it being evaluated by the shell.
     k8sServiceNameRule: ${K8S_SERVICE_NAME_RULE:"${service.metadata.name}"}
+    enabledMALRules: ${SW_ENVOY_METRIC_MAL_RULES:"envoy"}
 
 prometheus-fetcher:
   selector: ${SW_PROMETHEUS_FETCHER:-}
diff --git a/oap-server/server-bootstrap/src/main/resources/envoy-metrics-rules/envoy.yaml b/oap-server/server-bootstrap/src/main/resources/envoy-metrics-rules/envoy.yaml
new file mode 100644
index 0000000..bac0d36
--- /dev/null
+++ b/oap-server/server-bootstrap/src/main/resources/envoy-metrics-rules/envoy.yaml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+#    "PT20.345S" -- parses as "20.345 seconds"
+#    "PT15M"     -- parses as "15 minutes" (where a minute is 60 seconds)
+#    "PT10H"     -- parses as "10 hours" (where an hour is 3600 seconds)
+#    "P2D"       -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
+#    "P2DT3H4M"  -- parses as "2 days, 3 hours and 4 minutes"
+#    "P-6H3M"    -- parses as "-6 hours and +3 minutes"
+#    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
+#    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
+# </pre>
+
+expSuffix: tag({tags -> tags.cluster = 'istio::' + tags.cluster}).instance(['cluster'], ['instance'])
+metricPrefix: envoy
+metricsRules:
+  - name: heap_memory_max_used
+    exp: server_memory_heap_size.max(['cluster', 'instance'])
+  - name: heap_memory_used
+    exp: server_memory_heap_size
+  - name: total_connections_used
+    exp: server_total_connections.max(['cluster', 'instance'])
+  - name: parent_connections_used
+    exp: server_parent_connections.max(['cluster', 'instance'])
diff --git a/oap-server/server-bootstrap/src/main/resources/oal/envoy.oal b/oap-server/server-bootstrap/src/main/resources/oal/envoy.oal
deleted file mode 100644
index 90d7b4d..0000000
--- a/oap-server/server-bootstrap/src/main/resources/oal/envoy.oal
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Envoy instance metrics
-envoy_heap_memory_max_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.memory_heap_size").maxDouble();
-envoy_total_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.total_connections").maxDouble();
-envoy_parent_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.parent_connections").maxDouble();
\ No newline at end of file
diff --git a/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml b/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml
index d32cf5a..3a5c3d3 100644
--- a/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml
+++ b/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml
@@ -28,7 +28,7 @@
 #    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
 #    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
 # </pre>
-expSuffix: tag({tags -> tags.cluster = 'istio-ctrl::' + tags.cluster}).service(['cluster', 'app'])
+expSuffix: tag({tags -> tags.cluster = 'istio::' + tags.cluster}).service(['cluster', 'app'])
 metricPrefix: meter_istio
 metricsRules:
   ## Resource usage
diff --git a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml
index bea98db..ed89ce8 100644
--- a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml
+++ b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml
@@ -30,7 +30,7 @@ templates:
         {
           "name": "Istio",
           "type": "service",
-          "serviceGroup": "istio-ctrl",
+          "serviceGroup": "istio",
           "children": [
             {
               "name": "Control Plane",
@@ -164,6 +164,33 @@ templates:
                   "chartType": "ChartArea"
                 }
               ]
+            },
+            {
+              "name": "Data Plane",
+              "children": [
+                {
+                  "width": "3",
+                  "title": "Heap Memory Max Used",
+                  "height": 350,
+                  "entityType": "ServiceInstance",
+                  "independentSelector": false,
+                  "metricType": "REGULAR_VALUE",
+                  "metricName": "envoy_heap_memory_max_used,envoy_heap_memory_used",
+                  "queryMetricType": "readMetricsValues",
+                  "chartType": "ChartLine"
+                },
+                {
+                  "width": "3",
+                  "title": "Connections Used",
+                  "height": 350,
+                  "entityType": "ServiceInstance",
+                  "independentSelector": false,
+                  "metricType": "REGULAR_VALUE",
+                  "metricName": "envoy_total_connections_used,envoy_parent_connections_used",
+                  "queryMetricType": "readMetricsValues",
+                  "chartType": "ChartLine"
+                }
+              ]
             }
           ]
         }
@@ -172,4 +199,4 @@ templates:
     # False means providing a basic template, user needs to add it manually.
     activated: true
     # True means wouldn't show up on the dashboard. Only keeps the definition in the storage.
-    disabled: false
\ No newline at end of file
+    disabled: false
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
index 93eed36..c1caf8b 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
@@ -36,6 +36,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.skywalking</groupId>
+            <artifactId>meter-analyzer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
             <artifactId>skywalking-mesh-receiver-plugin</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
index 5819d80..5a7b31b 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java
@@ -18,13 +18,17 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy;
 
+import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import lombok.Getter;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 
 public class EnvoyMetricReceiverConfig extends ModuleConfig {
     @Getter
@@ -32,6 +36,8 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
     private String alsHTTPAnalysis;
     @Getter
     private String k8sServiceNameRule;
+    @Getter
+    private String enabledMALRules;
 
     public List<String> getAlsHTTPAnalysis() {
         if (Strings.isNullOrEmpty(alsHTTPAnalysis)) {
@@ -39,4 +45,9 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
         }
         return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
     }
+
+    public List<Rule> rules() throws ModuleStartException {
+        final List<String> enabledRules = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(getEnabledMALRules());
+        return Rules.loadRules("envoy-metrics-rules", enabledRules);
+    }
 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java
index 28a9126..df723b9 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java
@@ -68,7 +68,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider {
                         .getService(OALEngineLoaderService.class)
                         .load(EnvoyOALDefine.INSTANCE);
 
-            service.addHandler(new MetricServiceGRPCHandler(getManager()));
+            service.addHandler(new MetricServiceGRPCHandler(getManager(), config));
         }
         service.addHandler(new AccessLogServiceGRPCHandler(getManager(), config));
     }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
index 0ed9b3e..1f62a33 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
@@ -25,16 +25,22 @@ import io.envoyproxy.envoy.service.metrics.v2.StreamMetricsResponse;
 import io.grpc.stub.StreamObserver;
 import io.prometheus.client.Metrics;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.source.EnvoyInstanceMetric;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
 import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
+import org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters.ProtoMetricFamily2MetricsAdapter;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
@@ -44,10 +50,11 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 @Slf4j
 public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceImplBase {
     private final SourceReceiver sourceReceiver;
-    private CounterMetrics counter;
-    private HistogramMetrics histogram;
+    private final CounterMetrics counter;
+    private final HistogramMetrics histogram;
+    private final List<PrometheusMetricConverter> converters;
 
-    public MetricServiceGRPCHandler(ModuleManager moduleManager) {
+    public MetricServiceGRPCHandler(final ModuleManager moduleManager, final EnvoyMetricReceiverConfig config) throws ModuleStartException {
         sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
         MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
                                                      .provider()
@@ -60,6 +67,13 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
             "envoy_metric_in_latency", "The process latency of service metrics receiver", MetricsTag.EMPTY_KEY,
             MetricsTag.EMPTY_VALUE
         );
+
+        final MeterSystem meterSystem = moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
+
+        converters = config.rules()
+                           .stream()
+                           .map(rule -> new PrometheusMetricConverter(rule, meterSystem))
+                           .collect(Collectors.toList());
     }
 
     @Override
@@ -79,17 +93,15 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
                     isFirst = false;
                     StreamMetricsMessage.Identifier identifier = message.getIdentifier();
                     Node node = identifier.getNode();
-                    if (node != null) {
-                        String nodeId = node.getId();
-                        if (!StringUtil.isEmpty(nodeId)) {
-                            serviceInstanceName = nodeId;
-                        }
-                        String cluster = node.getCluster();
-                        if (!StringUtil.isEmpty(cluster)) {
-                            serviceName = cluster;
-                            if (serviceInstanceName == null) {
-                                serviceInstanceName = serviceName;
-                            }
+                    String nodeId = node.getId();
+                    if (!StringUtil.isEmpty(nodeId)) {
+                        serviceInstanceName = nodeId;
+                    }
+                    String cluster = node.getCluster();
+                    if (!StringUtil.isEmpty(cluster)) {
+                        serviceName = cluster;
+                        if (serviceInstanceName == null) {
+                            serviceInstanceName = serviceName;
                         }
                     }
 
@@ -108,53 +120,23 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
                 if (StringUtil.isNotEmpty(serviceName) && StringUtil.isNotEmpty(serviceInstanceName)) {
                     List<Metrics.MetricFamily> list = message.getEnvoyMetricsList();
                     boolean needHeartbeatUpdate = true;
-                    for (int i = 0; i < list.size(); i++) {
+
+                    for (final Metrics.MetricFamily metricFamily : list) {
                         counter.inc();
 
                         final String serviceId = IDManager.ServiceID.buildId(serviceName, NodeType.Normal);
-                        final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(
-                            serviceId, serviceInstanceName);
-
-                        HistogramMetrics.Timer timer = histogram.createTimer();
-                        try {
-                            Metrics.MetricFamily metricFamily = list.get(i);
-                            double value = 0;
-                            long timestamp = 0;
-                            switch (metricFamily.getType()) {
-                                case GAUGE:
-                                    for (Metrics.Metric metrics : metricFamily.getMetricList()) {
-                                        timestamp = metrics.getTimestampMs();
-                                        value = metrics.getGauge().getValue();
-
-                                        if (timestamp > 1000000000000000000L) {
-                                            /**
-                                             * Several versions of envoy in istio.deps send timestamp in nanoseconds,
-                                             * instead of milliseconds(protocol says).
-                                             *
-                                             * Sadly, but have to fix it forcedly.
-                                             *
-                                             * An example of timestamp is '1552303033488741055', clearly it is not in milliseconds.
-                                             *
-                                             * This should be removed in the future.
-                                             */
-                                            timestamp /= 1_000_000;
-                                        }
-
-                                        EnvoyInstanceMetric metricSource = new EnvoyInstanceMetric();
-                                        metricSource.setServiceId(serviceId);
-                                        metricSource.setServiceName(serviceName);
-                                        metricSource.setId(serviceInstanceId);
-                                        metricSource.setName(serviceInstanceName);
-                                        metricSource.setMetricName(metricFamily.getName());
-                                        metricSource.setValue(value);
-                                        metricSource.setTimeBucket(TimeBucket.getMinuteTimeBucket(timestamp));
-                                        sourceReceiver.receive(metricSource);
-                                    }
-                                    break;
-                                default:
-                                    continue;
-                            }
-                            if (needHeartbeatUpdate) {
+
+                        try (final HistogramMetrics.Timer ignored = histogram.createTimer()) {
+                            final ProtoMetricFamily2MetricsAdapter adapter = new ProtoMetricFamily2MetricsAdapter(metricFamily);
+                            final Stream<Metric> metrics = adapter.adapt().peek(it -> {
+                                it.getLabels().putIfAbsent("cluster", serviceName);
+                                it.getLabels().putIfAbsent("instance", serviceInstanceName);
+                            });
+                            converters.forEach(converter -> converter.toMeter(metrics));
+
+                            if (needHeartbeatUpdate && list.get(0).getMetricCount() > 0) {
+                                final long timestamp = adapter.adaptTimestamp(list.get(0).getMetric(0));
+
                                 // Send heartbeat
                                 ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
                                 serviceInstanceUpdate.setName(serviceInstanceName);
@@ -163,8 +145,6 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
                                 sourceReceiver.receive(serviceInstanceUpdate);
                                 needHeartbeatUpdate = false;
                             }
-                        } finally {
-                            timer.finish();
                         }
                     }
                 }
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java
new file mode 100644
index 0000000..578b491
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters;
+
+import io.prometheus.client.Metrics;
+import java.util.Map;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
+
+import static java.util.stream.Collectors.toMap;
+
+@RequiredArgsConstructor
+public class ProtoMetricFamily2MetricsAdapter {
+    protected final Metrics.MetricFamily metricFamily;
+
+    public Stream<Metric> adapt() {
+        // TODO: should adapt more types
+        switch (metricFamily.getType()) {
+            case GAUGE:
+                return metricFamily.getMetricList()
+                                   .stream()
+                                   .map(it -> Gauge.builder()
+                                                   .name(adaptMetricsName(it))
+                                                   .value(adaptValue(it))
+                                                   .timestamp(adaptTimestamp(it))
+                                                   .labels(adaptLabels(it))
+                                                   .build());
+            default:
+                return Stream.of();
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public String adaptMetricsName(final Metrics.Metric metric) {
+        return metricFamily.getName();
+    }
+
+    public double adaptValue(final Metrics.Metric it) {
+        return it.getGauge().getValue();
+    }
+
+    public Map<String, String> adaptLabels(final Metrics.Metric metric) {
+        return metric.getLabelList()
+                     .stream()
+                     .collect(toMap(Metrics.LabelPair::getName, Metrics.LabelPair::getValue));
+    }
+
+    public long adaptTimestamp(final Metrics.Metric metric) {
+        long timestamp = metric.getTimestampMs();
+
+        if (timestamp > 1000000000000000000L) {
+            /*
+             * Several versions of envoy in istio.deps send timestamp in nanoseconds,
+             * instead of milliseconds(protocol says).
+             *
+             * Sadly, but have to fix it forcefully.
+             *
+             * An example of timestamp is '1552303033488741055', clearly it is not in milliseconds.
+             *
+             * This should be removed in the future.
+             */
+            timestamp /= 1_000_000;
+        }
+
+        return timestamp;
+    }
+}