You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/07/04 12:59:40 UTC

[skywalking] branch master updated: Add `forEach`, `processRelation` function to MAL Expression, and add `expPrefix` and `initExp` in MAL config (#9299)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bb8c69c67 Add `forEach`, `processRelation` function to MAL Expression, and add `expPrefix` and `initExp` in MAL config (#9299)
7bb8c69c67 is described below

commit 7bb8c69c673df02202646e3a09cc6720e8d2db4d
Author: mrproliu <74...@qq.com>
AuthorDate: Mon Jul 4 20:59:34 2022 +0800

    Add `forEach`, `processRelation` function to MAL Expression, and add `expPrefix` and `initExp` in MAL config (#9299)
---
 docs/en/changes/changes.md                         |   2 +
 docs/en/concepts-and-designs/mal.md                |   6 +
 docs/en/setup/backend/backend-meter.md             |   4 +
 docs/en/setup/backend/backend-zabbix.md            |   4 +
 docs/en/setup/backend/prometheus-metrics.md        |   4 +
 .../provider/meter/config/MeterConfig.java         |   2 +
 .../skywalking/oap/meter/analyzer/Analyzer.java    |  74 ++++++++---
 .../oap/meter/analyzer/MetricConvert.java          |  42 +++++--
 .../oap/meter/analyzer/MetricRuleConfig.java       |  10 ++
 .../skywalking/oap/meter/analyzer/dsl/DSL.java     |   9 +-
 .../ProcessRelationEntityDescription.java          |  48 +++++++
 .../analyzer/dsl/ExpressionParsingContext.java     |   5 -
 .../oap/meter/analyzer/dsl/SampleFamily.java       |  48 ++++++-
 .../analyzer/dsl/registry/ProcessRegistry.java     |  79 ++++++++++++
 .../oap/meter/analyzer/k8s/K8sInfoRegistry.java    |  43 +++++++
 .../oap/meter/analyzer/k8s/Kubernetes.java}        |  22 ++--
 .../oap/meter/analyzer/prometheus/rule/Rule.java   |   2 +
 .../oap/meter/analyzer/dsl/FunctionTest.java       |  13 ++
 .../oap/meter/analyzer/dsl/K8sTagTest.java         |  69 +++++++++--
 .../oap/meter/analyzer/dsl/ScopeTest.java          |  40 ++++++
 .../oap/server/core/analysis/IDManager.java        |  26 ++++
 .../process/ProcessRelationClientSideMetrics.java  | 138 +++++++++++++++++++++
 .../process/ProcessRelationDispatcher.java         |  59 +++++++++
 .../process/ProcessRelationServerSideMetrics.java  | 138 +++++++++++++++++++++
 .../server/core/analysis/meter/MeterEntity.java    |  23 ++++
 .../oap/server/core/analysis/meter/ScopeType.java  |   3 +-
 .../oap/server/core/source/DefaultScopeDefine.java |   2 +
 .../oap/server/core/source/ProcessRelation.java    |  64 ++++++++++
 .../zabbix/provider/config/ZabbixConfig.java       |   2 +
 .../main/resources/otel-oc-rules/k8s-cluster.yaml  |   1 +
 .../main/resources/otel-oc-rules/k8s-instance.yaml |   1 +
 .../main/resources/otel-oc-rules/k8s-service.yaml  |   1 +
 32 files changed, 928 insertions(+), 56 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 5a64514d71..db6ea1c71e 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -15,6 +15,8 @@
 * Add component ID(128) for Java Hutool plugin.
 * Add Zipkin query exception handler, response error message for illegal arguments.
 * Fix a NullPointerException in the endpoint analysis, which would cause missing MQ-related `LocalSpan` in the trace.
+* Add `forEach`, `processRelation` function to MAL expression.
+* Add `expPrefix`, `initExp` in MAL config.
 
 #### UI
 
diff --git a/docs/en/concepts-and-designs/mal.md b/docs/en/concepts-and-designs/mal.md
index 85d6d693b6..e92ceef7cb 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -211,6 +211,10 @@ Examples:
 #### time
 `time()`: Returns the number of seconds since January 1, 1970 UTC.
 
+#### foreach
+`forEach([string_array], Closure<Void> each)`: Iterates all samples according to the first array argument, and provide two parameters in the second closure argument:
+1. `element`: element in the array.
+2. `tags`: tags in each sample.
 
 ## Down Sampling Operation
 MAL should instruct meter-system on how to downsample for metrics. It doesn't only refer to aggregate raw samples to
@@ -245,6 +249,8 @@ They extract level relevant labels from metric labels, then informs the meter-sy
                                                                       extracts endpoint level labels from the second array argument, extracts layer from `Layer` argument.
  - `serviceRelation(DetectPoint, [source_svc_label1...], [dest_svc_label1...], Layer)` DetectPoint including `DetectPoint.CLIENT` and `DetectPoint.SERVER`,
    extracts `sourceService` labels from the first array argument, extracts `destService` labels from the second array argument, extracts layer from `Layer` argument.
+ - `processRelation(detect_point_label, [service_label1...], [instance_label1...], source_process_id_label, dest_process_id_label)` extracts `DetectPoint` labels from first argument, the label value should be `client` or `server`.
+   extracts `Service` labels from the first array argument, extracts `Instance` labels from the second array argument, extracts `ProcessID` labels from the fourth and fifth arguments of the source and destination.
 
 ## More Examples
 
diff --git a/docs/en/setup/backend/backend-meter.md b/docs/en/setup/backend/backend-meter.md
index 1e48ded8de..dee81764e7 100644
--- a/docs/en/setup/backend/backend-meter.md
+++ b/docs/en/setup/backend/backend-meter.md
@@ -81,8 +81,12 @@ If you're using Spring Sleuth, see [Spring Sleuth Setup](spring-sleuth-setup.md)
 ### Meters configuration
 
 ```yaml
+# initExp is the expression that initializes the current configuration file
+initExp: <string>
 # filter the metrics, only those metrics that satisfy this condition will be passed into the `metricsRules` below.
 filter: <closure> # example: '{ tags -> tags.job_name == "vm-monitoring" }'
+# expPrefix is executed before the metrics executes other functions.
+expPrefix: <string>
 # expSuffix is appended to all expression in this file.
 expSuffix: <string>
 # insert metricPrefix into metric name:  <metricPrefix>_<raw_metric_name>
diff --git a/docs/en/setup/backend/backend-zabbix.md b/docs/en/setup/backend/backend-zabbix.md
index 015ce8afb3..5a14f96ce8 100644
--- a/docs/en/setup/backend/backend-zabbix.md
+++ b/docs/en/setup/backend/backend-zabbix.md
@@ -30,8 +30,12 @@ You can find details on Zabbix agent items from [Zabbix Agent documentation](htt
 ### Configuration file
 
 ```yaml
+# initExp is the expression that initializes the current configuration file
+initExp: <string>
 # insert metricPrefix into metric name:  <metricPrefix>_<raw_metric_name>
 metricPrefix: <string>
+# expPrefix is executed before the metrics executes other functions.
+expPrefix: <string>
 # expSuffix is appended to all expression in this file.
 expSuffix: <string>
 # Datasource from Zabbix Item keys.
diff --git a/docs/en/setup/backend/prometheus-metrics.md b/docs/en/setup/backend/prometheus-metrics.md
index f14113c871..8fa784e542 100644
--- a/docs/en/setup/backend/prometheus-metrics.md
+++ b/docs/en/setup/backend/prometheus-metrics.md
@@ -37,8 +37,12 @@ staticConfig:
   # Labels assigned to all metrics fetched from the targets.
   labels:
     [ <labelname>: <labelvalue> ... ]
+# initExp is the expression that initializes the current configuration file
+initExp: <string>
 # filter the metrics, only those metrics that satisfy this condition will be passed into the `metricsRules` below.
 filter: <closure> # example: '{ tags -> tags.job_name == "vm-monitoring" }'
+# expPrefix is executed before the metrics executes other functions.
+expPrefix: <string>
 # expSuffix is appended to all expression in this file.
 expSuffix: <string>
 # insert metricPrefix into metric name:  <metricPrefix>_<raw_metric_name>
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
index c2e8930302..4bc3bec8c7 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
@@ -29,8 +29,10 @@ import java.util.List;
 public class MeterConfig implements MetricRuleConfig {
     private String metricPrefix;
     private String expSuffix;
+    private String expPrefix;
     private String filter;
     private List<Rule> metricsRules;
+    private String initExp;
 
     @Data
     @NoArgsConstructor
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 05b01868cf..28c2c487f4 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -42,11 +42,12 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.FilterExpression;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
-import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
 import org.apache.skywalking.oap.server.core.analysis.Layer;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
 import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationClientSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
 import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
 import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
 import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
@@ -241,10 +242,6 @@ public class Analyzer {
             }
         }
         createMetric(ctx.getScopeType(), metricType.literal, ctx.getDownsampling());
-
-        if (ctx.isRetagByK8sMeta()) {
-            K8sInfoRegistry.getInstance().start();
-        }
     }
 
     private void createMetric(final ScopeType scopeType,
@@ -262,16 +259,12 @@ public class Analyzer {
 
     private void generateTraffic(MeterEntity entity) {
         if (entity.getDetectPoint() != null) {
-            switch (entity.getDetectPoint()) {
-                case SERVER:
-                    entity.setServiceName(entity.getDestServiceName());
-                    toService(requireNonNull(entity.getDestServiceName()), entity.getLayer());
-                    serverSide(entity);
+            switch (entity.getScopeType()) {
+                case SERVICE_RELATION:
+                    serviceRelationTraffic(entity);
                     break;
-                case CLIENT:
-                    entity.setServiceName(entity.getSourceServiceName());
-                    toService(requireNonNull(entity.getSourceServiceName()), entity.getLayer());
-                    clientSide(entity);
+                case PROCESS_RELATION:
+                    processRelationTraffic(entity);
                     break;
                 default:
             }
@@ -309,7 +302,23 @@ public class Analyzer {
         MetricsStreamProcessor.getInstance().in(s);
     }
 
-    private void serverSide(MeterEntity entity) {
+    private void serviceRelationTraffic(MeterEntity entity) {
+        switch (entity.getDetectPoint()) {
+            case SERVER:
+                entity.setServiceName(entity.getDestServiceName());
+                toService(requireNonNull(entity.getDestServiceName()), entity.getLayer());
+                serviceRelationServerSide(entity);
+                break;
+            case CLIENT:
+                entity.setServiceName(entity.getSourceServiceName());
+                toService(requireNonNull(entity.getSourceServiceName()), entity.getLayer());
+                serviceRelationClientSide(entity);
+                break;
+            default:
+        }
+    }
+
+    private void serviceRelationServerSide(MeterEntity entity) {
         ServiceRelationServerSideMetrics metrics = new ServiceRelationServerSideMetrics();
         metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
         metrics.setSourceServiceId(entity.sourceServiceId());
@@ -319,7 +328,7 @@ public class Analyzer {
         MetricsStreamProcessor.getInstance().in(metrics);
     }
 
-    private void clientSide(MeterEntity entity) {
+    private void serviceRelationClientSide(MeterEntity entity) {
         ServiceRelationClientSideMetrics metrics = new ServiceRelationClientSideMetrics();
         metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
         metrics.setSourceServiceId(entity.sourceServiceId());
@@ -328,4 +337,37 @@ public class Analyzer {
         metrics.setEntityId(entity.id());
         MetricsStreamProcessor.getInstance().in(metrics);
     }
+
+    private void processRelationTraffic(MeterEntity entity) {
+        switch (entity.getDetectPoint()) {
+            case SERVER:
+                processRelationServerSide(entity);
+                break;
+            case CLIENT:
+                processRelationClientSide(entity);
+                break;
+            default:
+        }
+    }
+
+    private void processRelationServerSide(MeterEntity entity) {
+        ProcessRelationServerSideMetrics metrics = new ProcessRelationServerSideMetrics();
+        metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+        metrics.setServiceInstanceId(entity.serviceInstanceId());
+        metrics.setSourceProcessId(entity.getSourceProcessId());
+        metrics.setDestProcessId(entity.getDestProcessId());
+        metrics.setEntityId(entity.id());
+        MetricsStreamProcessor.getInstance().in(metrics);
+    }
+
+    private void processRelationClientSide(MeterEntity entity) {
+        ProcessRelationClientSideMetrics metrics = new ProcessRelationClientSideMetrics();
+        metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+        metrics.setServiceInstanceId(entity.serviceInstanceId());
+        metrics.setSourceProcessId(entity.getSourceProcessId());
+        metrics.setDestProcessId(entity.getDestProcessId());
+        metrics.setEntityId(entity.id());
+        MetricsStreamProcessor.getInstance().in(metrics);
+    }
+
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
index 07ff140a1e..54a3e79261 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
@@ -26,6 +26,11 @@ import java.util.List;
 import java.util.StringJoiner;
 import java.util.stream.Stream;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
+import org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingException;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
 
@@ -48,14 +53,28 @@ public class MetricConvert {
 
     public MetricConvert(MetricRuleConfig rule, MeterSystem service) {
         Preconditions.checkState(!Strings.isNullOrEmpty(rule.getMetricPrefix()));
+        // init expression script
+        if (StringUtils.isNotEmpty(rule.getInitExp())) {
+            handleInitExp(rule.getInitExp());
+        }
+
         this.analyzers = rule.getMetricsRules().stream().map(
-            r -> Analyzer.build(
-                formatMetricName(rule, r.getName()),
-                rule.getFilter(),
-                Strings.isNullOrEmpty(rule.getExpSuffix()) ?
-                    r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
-                service
-            )
+            r -> {
+                String exp = r.getExp();
+                if (!Strings.isNullOrEmpty(rule.getExpPrefix())) {
+                    exp = String.format("(%s.%s).%s", StringUtils.substringBefore(exp, "."), rule.getExpPrefix(),
+                            StringUtils.substringAfter(exp, "."));
+                }
+                if (!Strings.isNullOrEmpty(rule.getExpSuffix())) {
+                    exp = String.format("(%s).%s", exp, rule.getExpSuffix());
+                }
+                return Analyzer.build(
+                    formatMetricName(rule, r.getName()),
+                    rule.getFilter(),
+                    exp,
+                    service
+                );
+            }
         ).collect(toList());
     }
 
@@ -83,4 +102,13 @@ public class MetricConvert {
         metricName.add(rule.getMetricPrefix()).add(meterRuleName);
         return metricName.toString();
     }
+
+    private void handleInitExp(String exp) {
+        Expression e = DSL.parse(exp);
+        final Result result = e.run(ImmutableMap.of());
+        if (!result.isSuccess() && result.isThrowable()) {
+            throw new ExpressionParsingException(
+                "failed to execute init expression: " + exp + ", error:" + result.getError());
+        }
+    }
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
index 831f4c8a87..2e75b1d89c 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
@@ -35,6 +35,11 @@ public interface MetricRuleConfig {
      */
     String getExpSuffix();
 
+    /**
+     * Get MAL expression prefix
+     */
+    String getExpPrefix();
+
     /**
      * Get all rules
      */
@@ -42,6 +47,11 @@ public interface MetricRuleConfig {
 
     String getFilter();
 
+    /**
+     * Get the init expression script
+     */
+    String getInitExp();
+
     interface RuleConfig {
         /**
          * Get definition metrics name
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
index ee0b9debdd..d4f0415c03 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
@@ -25,7 +25,10 @@ import groovy.util.DelegatingScript;
 import java.lang.reflect.Array;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.skywalking.oap.meter.analyzer.dsl.registry.ProcessRegistry;
 import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
+import org.apache.skywalking.oap.meter.analyzer.k8s.Kubernetes;
 import org.apache.skywalking.oap.server.core.analysis.Layer;
 import org.apache.skywalking.oap.server.core.source.DetectPoint;
 import org.codehaus.groovy.ast.stmt.DoWhileStatement;
@@ -54,6 +57,8 @@ public final class DSL {
         icz.addImport("K8sRetagType", K8sRetagType.class.getName());
         icz.addImport("DetectPoint", DetectPoint.class.getName());
         icz.addImport("Layer", Layer.class.getName());
+        icz.addImport("ProcessRegistry", ProcessRegistry.class.getName());
+        icz.addImport("Kubernetes", Kubernetes.class.getName());
         cc.addCompilationCustomizers(icz);
 
         final SecureASTCustomizer secureASTCustomizer = new SecureASTCustomizer();
@@ -73,7 +78,9 @@ public final class DSL {
                          .add(K8sRetagType.class)
                          .add(DetectPoint.class)
                          .add(Layer.class)
-                         .build());
+                         .add(ProcessRegistry.class)
+                         .add(Kubernetes.class)
+                .build());
         cc.addCompilationCustomizers(secureASTCustomizer);
 
         GroovyShell sh = new GroovyShell(new Binding(), cc);
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/EntityDescription/ProcessRelationEntityDescription.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/EntityDescription/ProcessRelationEntityDescription.java
new file mode 100644
index 0000000000..8851bbf9b5
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/EntityDescription/ProcessRelationEntityDescription.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription;
+
+import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
+
+import java.util.List;
+
+@Getter
+@RequiredArgsConstructor
+@ToString
+public class ProcessRelationEntityDescription implements EntityDescription {
+    private final ScopeType scopeType = ScopeType.PROCESS_RELATION;
+    private final List<String> serviceKeys;
+    private final List<String> instanceKeys;
+    private final String sourceProcessIdKey;
+    private final String destProcessIdKey;
+    private final String detectPointKey;
+    private final String delimiter;
+
+    @Override
+    public List<String> getLabelKeys() {
+        return ImmutableList.<String>builder()
+                .addAll(serviceKeys)
+                .addAll(instanceKeys)
+                .add(detectPointKey, sourceProcessIdKey, destProcessIdKey).build();
+    }
+}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
index f571ccae39..a09eaedb1f 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
@@ -71,11 +71,6 @@ public class ExpressionParsingContext implements Closeable {
 
     ScopeType scopeType;
 
-    /**
-     * Mark whether the retagByK8sMeta func in expressions is active
-     */
-    boolean isRetagByK8sMeta;
-
     /**
      * Get labels no scope related.
      *
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index 93f88cc6d1..acc7e9c489 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -23,9 +23,12 @@ import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toList;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EndpointEntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.InstanceEntityDescription;
+import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ProcessRelationEntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ServiceEntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ServiceRelationEntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
@@ -36,6 +39,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
 import org.apache.skywalking.oap.server.core.source.DetectPoint;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -358,7 +362,6 @@ public class SampleFamily {
         Preconditions.checkArgument(!Strings.isNullOrEmpty(newLabelName));
         Preconditions.checkArgument(!Strings.isNullOrEmpty(existingLabelName));
         Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceLabelName));
-        ExpressionParsingContext.get().ifPresent(ctx -> ctx.isRetagByK8sMeta = true);
         if (this == EMPTY) {
             return EMPTY;
         }
@@ -497,6 +500,38 @@ public class SampleFamily {
         return createMeterSamples(new ServiceRelationEntityDescription(sourceServiceKeys, destServiceKeys, detectPoint, layer, Const.POINT));
     }
 
+    public SampleFamily forEach(List<String> array, Closure<Void> each) {
+        if (this == EMPTY) {
+            return EMPTY;
+        }
+        return SampleFamily.build(this.context, Arrays.stream(this.samples).map(sample -> {
+            Map<String, String> labels = Maps.newHashMap(sample.getLabels());
+            for (String element : array) {
+                each.call(element, labels);
+            }
+            return sample.toBuilder().labels(ImmutableMap.copyOf(labels)).build();
+        }).toArray(Sample[]::new));
+    }
+
+    public SampleFamily processRelation(String detectPointKey, List<String> serviceKeys, List<String> instanceKeys, String sourceProcessIdKey, String destProcessIdKey) {
+        Preconditions.checkArgument(serviceKeys.size() > 0);
+        Preconditions.checkArgument(instanceKeys.size() > 0);
+        Preconditions.checkArgument(StringUtil.isNotEmpty(sourceProcessIdKey));
+        Preconditions.checkArgument(StringUtil.isNotEmpty(destProcessIdKey));
+        ExpressionParsingContext.get().ifPresent(ctx -> {
+            ctx.scopeType = ScopeType.PROCESS_RELATION;
+            ctx.scopeLabels.addAll(serviceKeys);
+            ctx.scopeLabels.addAll(instanceKeys);
+            ctx.scopeLabels.add(detectPointKey);
+            ctx.scopeLabels.add(sourceProcessIdKey);
+            ctx.scopeLabels.add(destProcessIdKey);
+        });
+        if (this == EMPTY) {
+            return EMPTY;
+        }
+        return createMeterSamples(new ProcessRelationEntityDescription(serviceKeys, instanceKeys, sourceProcessIdKey, destProcessIdKey, detectPointKey, Const.POINT));
+    }
+
     private SampleFamily createMeterSamples(EntityDescription entityDescription) {
         Map<MeterEntity, Sample[]> meterSamples = new HashMap<>();
         Arrays.stream(samples)
@@ -651,6 +686,17 @@ public class SampleFamily {
                         InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys(), serviceRelationEntityDescription.getDelimiter()),
                         serviceRelationEntityDescription.getDetectPoint(), serviceRelationEntityDescription.getLayer()
                     );
+                case PROCESS_RELATION:
+                    final ProcessRelationEntityDescription processRelationEntityDescription = (ProcessRelationEntityDescription) entityDescription;
+                    final String detectPointValue = InternalOps.dim(samples, Collections.singletonList(processRelationEntityDescription.getDetectPointKey()), processRelationEntityDescription.getDelimiter());
+                    DetectPoint point = StringUtils.equalsAnyIgnoreCase(detectPointValue, "server") ? DetectPoint.SERVER : DetectPoint.CLIENT;
+                    return MeterEntity.newProcessRelation(
+                        InternalOps.dim(samples, processRelationEntityDescription.getServiceKeys(), processRelationEntityDescription.getDelimiter()),
+                        InternalOps.dim(samples, processRelationEntityDescription.getInstanceKeys(), processRelationEntityDescription.getDelimiter()),
+                        InternalOps.dim(samples, Collections.singletonList(processRelationEntityDescription.getSourceProcessIdKey()), processRelationEntityDescription.getDelimiter()),
+                        InternalOps.dim(samples, Collections.singletonList(processRelationEntityDescription.getDestProcessIdKey()), processRelationEntityDescription.getDelimiter()),
+                        point
+                    );
                 default:
                     throw new UnexpectedException(
                         "Unexpected scope type of entityDescription " + entityDescription);
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/registry/ProcessRegistry.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/registry/ProcessRegistry.java
new file mode 100644
index 0000000000..5298fa9b6c
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/registry/ProcessRegistry.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.registry;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+
+/**
+ * The dynamic entity registry for {@link ProcessTraffic}
+ */
+public class ProcessRegistry {
+
+    public static final String LOCAL_VIRTUAL_PROCESS = "UNKNOWN_LOCAL";
+    public static final String REMOTE_VIRTUAL_PROCESS = "UNKNOWN_REMOTE";
+
+    /**
+     * Generate virtual local process under the instance
+     * @return the process id
+     */
+    public static String generateVirtualLocalProcess(String service, String instance) {
+        return generateVirtualProcess(service, instance, LOCAL_VIRTUAL_PROCESS);
+    }
+
+    /**
+     * Generate virtual remote process under the instance
+     * trying to generate the name in the kubernetes environment through the remote address
+     * @return the process id
+     */
+    public static String generateVirtualRemoteProcess(String service, String instance, String remoteAddress) {
+        // remove port
+        String ip = StringUtils.substringBeforeLast(remoteAddress, ":");
+
+        // find remote through k8s metadata
+        String name = K8sInfoRegistry.getInstance().findPodByIP(ip);
+        if (StringUtils.isEmpty(name)) {
+            name = K8sInfoRegistry.getInstance().findServiceByIP(ip);
+        }
+        // if not exists, then just use remote unknown
+        if (StringUtils.isEmpty(name)) {
+            name = REMOTE_VIRTUAL_PROCESS;
+        }
+
+        return generateVirtualProcess(service, instance, name);
+    }
+
+    private static String generateVirtualProcess(String service, String instance, String processName) {
+        final ProcessTraffic traffic = new ProcessTraffic();
+        final String serviceId = IDManager.ServiceID.buildId(service, true);
+        traffic.setServiceId(serviceId);
+        traffic.setInstanceId(IDManager.ServiceInstanceID.buildId(serviceId, instance));
+        traffic.setName(processName);
+        traffic.setDetectType(ProcessDetectType.VIRTUAL.value());
+        traffic.setTimeBucket(TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
+        MetricsStreamProcessor.getInstance().in(traffic);
+        return traffic.id();
+    }
+}
\ No newline at end of file
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
index eb318bdd8b..d44bb84a39 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
@@ -24,13 +24,17 @@ import io.kubernetes.client.informer.ResourceEventHandler;
 import io.kubernetes.client.informer.SharedInformerFactory;
 import io.kubernetes.client.openapi.Configuration;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1LoadBalancerIngress;
+import io.kubernetes.client.openapi.models.V1LoadBalancerStatus;
 import io.kubernetes.client.openapi.models.V1PodList;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1ServiceList;
+import io.kubernetes.client.openapi.models.V1ServiceStatus;
 import io.kubernetes.client.util.Config;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.models.V1Pod;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,6 +44,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
 
 import static java.util.Objects.isNull;
 import static java.util.Optional.ofNullable;
@@ -52,6 +58,8 @@ public class K8sInfoRegistry {
     private final Map<String/* podName.namespace */, V1Pod> namePodMap = new ConcurrentHashMap<>();
     protected final Map<String/* serviceName.namespace  */, V1Service> nameServiceMap = new ConcurrentHashMap<>();
     private final Map<String/* podName.namespace */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
+    private final Map<String/* podIP */, String /* podName.namespace */> ipPodMap = new ConcurrentHashMap<>();
+    private final Map<String/* serviceIP */, String /* serviceName.namespace */> ipServiceMap = new ConcurrentHashMap<>();
     private ExecutorService executor;
     private static final String SEPARATOR = ".";
 
@@ -168,6 +176,10 @@ public class K8sInfoRegistry {
         ofNullable(service.getMetadata()).ifPresent(
             metadata -> nameServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace())
         );
+        ofNullable(service.getStatus()).map(V1ServiceStatus::getLoadBalancer).filter(Objects::nonNull)
+            .map(V1LoadBalancerStatus::getIngress).filter(CollectionUtils::isNotEmpty)
+            .ifPresent(l -> l.stream().filter(i -> StringUtil.isNotEmpty(i.getIp()))
+                    .forEach(i -> ipServiceMap.remove(i.getIp())));
         recompose();
     }
 
@@ -184,10 +196,18 @@ public class K8sInfoRegistry {
 
         ofNullable(pod.getMetadata()).ifPresent(
             metadata -> podServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
+
+        ofNullable(pod.getStatus()).filter(s -> StringUtil.isNotEmpty(s.getPodIP())).ifPresent(
+            status -> ipPodMap.remove(status.getPodIP()));
     }
 
     private void recompose() {
         namePodMap.forEach((podName, pod) -> {
+            if (!isNull(pod.getMetadata())) {
+                ofNullable(pod.getStatus()).filter(s -> StringUtil.isNotEmpty(s.getPodIP())).ifPresent(
+                        status -> ipPodMap.put(status.getPodIP(), podName));
+            }
+
             nameServiceMap.forEach((serviceName, service) -> {
                 if (isNull(pod.getMetadata()) || isNull(service.getMetadata()) || isNull(service.getSpec())) {
                     return;
@@ -213,12 +233,35 @@ public class K8sInfoRegistry {
                 }
             });
         });
+        nameServiceMap.forEach((serviceName, service) -> {
+            if (isNull(service.getMetadata()) || isNull(service.getStatus()) || isNull(service.getStatus().getLoadBalancer())) {
+                return;
+            }
+            final List<V1LoadBalancerIngress> ingress = service.getStatus().getLoadBalancer().getIngress();
+            if (CollectionUtils.isEmpty(ingress)) {
+                return;
+            }
+
+            for (V1LoadBalancerIngress loadBalancerIngress : ingress) {
+                if (StringUtil.isNotEmpty(loadBalancerIngress.getIp())) {
+                    ipServiceMap.put(loadBalancerIngress.getIp(), serviceName);
+                }
+            }
+        });
     }
 
     public String findServiceName(String namespace, String podName) {
         return this.podServiceMap.get(podName + SEPARATOR + namespace);
     }
 
+    public String findPodByIP(String ip) {
+        return this.ipPodMap.get(ip);
+    }
+
+    public String findServiceByIP(String ip) {
+        return this.ipServiceMap.get(ip);
+    }
+
     private boolean hasIntersection(Collection<?> o, Collection<?> c) {
         Objects.requireNonNull(o);
         Objects.requireNonNull(c);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/Kubernetes.java
similarity index 62%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
copy to oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/Kubernetes.java
index 3d0a4a26bd..772be04409 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/Kubernetes.java
@@ -16,21 +16,13 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.meter;
+package org.apache.skywalking.oap.meter.analyzer.k8s;
 
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-
-public enum ScopeType {
-    SERVICE(DefaultScopeDefine.SERVICE),
-    SERVICE_INSTANCE(DefaultScopeDefine.SERVICE_INSTANCE),
-    ENDPOINT(DefaultScopeDefine.ENDPOINT),
-    SERVICE_RELATION(DefaultScopeDefine.SERVICE_RELATION);
-
-    @Getter
-    private final int scopeId;
-
-    ScopeType(final int scopeId) {
-        this.scopeId = scopeId;
+public class Kubernetes {
+    /**
+     * Start the listen the kubernetes metadata
+     */
+    public static void startMetadataListener() {
+        K8sInfoRegistry.getInstance().start();
     }
 }
\ No newline at end of file
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
index ea4e78ab9e..68d6b0a5b9 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
@@ -37,6 +37,8 @@ public class Rule implements MetricRuleConfig {
     private StaticConfig staticConfig;
     private String metricPrefix;
     private String expSuffix;
+    private String expPrefix;
     private String filter;
+    private String initExp;
     private List<MetricsRule> metricsRules;
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java
index a1a28a7c55..e9f72cbc4a 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java
@@ -123,6 +123,19 @@ public class FunctionTest {
                 ),
                 false,
             },
+            {
+                "for-each",
+                of("http_success_request", SampleFamilyBuilder.newBuilder(
+                    Sample.builder().labels(of("region", "us")).name("http_success_request").build(),
+                    Sample.builder().labels(of("region", "cn")).name("http_success_request").build()
+                ).build()),
+                "http_success_request.forEach(['v1', 'v2'], {element, tags -> tags[element] = 'test'})",
+                Result.success(SampleFamilyBuilder.newBuilder(
+                    Sample.builder().labels(of("region", "us", "v1", "test", "v2", "test")).name("http_success_request").build(),
+                    Sample.builder().labels(of("region", "cn", "v1", "test", "v2", "test")).name("http_success_request").build()
+                ).build()),
+                false,
+            },
         });
     }
 
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
index 199d8d9945..caa39ac8d0 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
@@ -19,17 +19,23 @@
 package org.apache.skywalking.oap.meter.analyzer.dsl;
 
 import com.google.common.collect.ImmutableMap;
+import io.kubernetes.client.openapi.models.V1LoadBalancerIngress;
+import io.kubernetes.client.openapi.models.V1LoadBalancerStatus;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodStatus;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+
+import io.kubernetes.client.openapi.models.V1ServiceStatus;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.Retag;
 import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -204,6 +210,37 @@ public class K8sTagTest {
                 ).build()),
                 false,
                 },
+            {
+                "IPAddress_to_name",
+                of("rover_network_profiling_process_write_bytes", SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                        .labels(
+                            of("service", "test", "instance", "test-instance", "side", "client",
+                                "client_address", "1.1.1.1", "server_address", "2.2.2.2")
+                        )
+                        .value(2)
+                        .name("rover_network_profiling_process_write_bytes")
+                        .build()
+                ).build()),
+                "rover_network_profiling_process_write_bytes.forEach(['client', 'server'] , " +
+                    "{prefix, tags -> tags[prefix + '_process_id'] = ProcessRegistry.generateVirtualRemoteProcess(tags.service, tags.instance, tags[prefix + '_address'])})",
+                Result.success(SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                        .labels(
+                            of("service", "test", "instance", "test-instance", "side", "client",
+                                "client_address", "1.1.1.1", "client_process_id", IDManager.ProcessID.buildId(
+                                    IDManager.ServiceInstanceID.buildId(IDManager.ServiceID.buildId("test", true), "test-instance"),
+                                    "my-nginx-5dc4865748-mbczh.default"),
+                                "server_address", "2.2.2.2", "server_process_id", IDManager.ProcessID.buildId(
+                                    IDManager.ServiceInstanceID.buildId(IDManager.ServiceID.buildId("test", true), "test-instance"),
+                                    "kube-state-metrics.kube-system"))
+                        )
+                        .value(2)
+                        .name("rover_network_profiling_process_write_bytes")
+                        .build()
+                ).build()),
+                false,
+            }
             });
     }
 
@@ -213,36 +250,37 @@ public class K8sTagTest {
         Whitebox.setInternalState(K8sInfoRegistry.class, "INSTANCE",
                                   Mockito.spy(K8sInfoRegistry.getInstance())
         );
+        PowerMockito.doNothing().when(K8sInfoRegistry.getInstance(), "start");
 
         PowerMockito.when(
-            K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
+            K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"))
                     .thenCallRealMethod();
         PowerMockito.when(
             K8sInfoRegistry.getInstance(), "addService",
-            mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"))
+            mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"), "2.2.2.2")
         ).thenCallRealMethod();
         PowerMockito.when(
             K8sInfoRegistry.getInstance(), "addPod",
-            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1")
         ).thenCallRealMethod();
         PowerMockito.when(
             K8sInfoRegistry.getInstance(), "addPod",
-            mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"))
+            mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"), "1.1.1.2")
         ).thenCallRealMethod();
 
         PowerMockito.when(
-            K8sInfoRegistry.getInstance(), "removeService", mockService("nginx-service", "default", of("run", "nginx")))
+            K8sInfoRegistry.getInstance(), "removeService", mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"))
                     .thenCallRealMethod();
         PowerMockito.when(
             K8sInfoRegistry.getInstance(), "removePod",
-            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1")
         ).thenCallRealMethod();
         PowerMockito.when(
-            K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
+            K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"))
                     .thenCallRealMethod();
         PowerMockito.when(
             K8sInfoRegistry.getInstance(), "addPod",
-            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1")
         ).thenCallRealMethod();
 
     }
@@ -266,7 +304,7 @@ public class K8sTagTest {
         assertThat(r, is(want));
     }
 
-    private V1Service mockService(String name, String namespace, Map<String, String> selector) {
+    private V1Service mockService(String name, String namespace, Map<String, String> selector, String ipAddress) {
         V1Service service = new V1Service();
         V1ObjectMeta serviceMeta = new V1ObjectMeta();
         V1ServiceSpec v1ServiceSpec = new V1ServiceSpec();
@@ -277,15 +315,26 @@ public class K8sTagTest {
         v1ServiceSpec.setSelector(selector);
         service.setSpec(v1ServiceSpec);
 
+        final V1ServiceStatus v1ServiceStatus = new V1ServiceStatus();
+        final V1LoadBalancerStatus balancerStatus = new V1LoadBalancerStatus();
+        final V1LoadBalancerIngress loadBalancerIngress = new V1LoadBalancerIngress();
+        loadBalancerIngress.setIp(ipAddress);
+        balancerStatus.setIngress(Arrays.asList(loadBalancerIngress));
+        v1ServiceStatus.setLoadBalancer(balancerStatus);
+        service.setStatus(v1ServiceStatus);
+
         return service;
     }
 
-    private V1Pod mockPod(String name, String namespace, Map<String, String> labels) {
+    private V1Pod mockPod(String name, String namespace, Map<String, String> labels, String ipAddress) {
         V1Pod v1Pod = new V1Pod();
         V1ObjectMeta podMeta = new V1ObjectMeta();
         podMeta.setName(name);
         podMeta.setNamespace(namespace);
         podMeta.setLabels(labels);
+        final V1PodStatus status = new V1PodStatus();
+        status.setPodIP(ipAddress);
+        v1Pod.setStatus(status);
         v1Pod.setMetadata(podMeta);
 
         return v1Pod;
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java
index 0cd95c7ab4..24882ecaa8 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java
@@ -532,6 +532,46 @@ public class ScopeTest {
                         );
                     }
                 }
+            },
+            {
+                "sum_process_relation",
+                of("rover_network_profiling_process_write_bytes", SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                        .labels(of("service", "test", "instance", "test-instance", "side", "server", "client_process_id", "abc", "server_process_id", "def"))
+                        .value(11)
+                        .name("rover_network_profiling_process_write_bytes")
+                        .build(),
+                    Sample.builder()
+                        .labels(of("service", "test", "instance", "test-instance", "side", "client", "client_process_id", "abc", "server_process_id", "def"))
+                        .value(12)
+                        .name("rover_network_profiling_process_write_bytes")
+                        .build()
+                ).build()),
+                "rover_network_profiling_process_write_bytes.sum(['service' ,'instance', 'side', 'client_process_id', 'server_process_id'])" +
+                    ".processRelation('side', ['service'], ['instance'], 'client_process_id', 'server_process_id')",
+                false,
+                new HashMap<MeterEntity, Sample[]>() {
+                    {
+                        put(
+                            MeterEntity.newProcessRelation("test", "test-instance", "abc", "def", DetectPoint.SERVER),
+                            new Sample[] {
+                                Sample.builder()
+                                    .labels(of())
+                                    .value(11)
+                                    .name("rover_network_profiling_process_write_bytes").build()
+                            }
+                        );
+                        put(
+                            MeterEntity.newProcessRelation("test", "test-instance", "abc", "def", DetectPoint.CLIENT),
+                            new Sample[] {
+                                Sample.builder()
+                                    .labels(of())
+                                    .value(12)
+                                    .name("rover_network_profiling_process_write_bytes").build()
+                            }
+                        );
+                    }
+                }
             }
         });
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
index dc09772c56..cf67d0eb62 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
@@ -268,6 +268,32 @@ public class IDManager {
             return Hashing.sha256().newHasher().putString(String.format("%s_%s",
                     name, instanceId), Charsets.UTF_8).hash().toString();
         }
+
+        /**
+         * @return encoded process relation id
+         */
+        public static String buildRelationId(ProcessRelationDefine define) {
+            return define.sourceId + Const.RELATION_ID_CONNECTOR + define.destId;
+        }
+
+        /**
+         * @return process relation ID object decoded from {@link #buildRelationId(ProcessRelationDefine)} result
+         */
+        public static ProcessRelationDefine analysisRelationId(String entityId) {
+            String[] parts = entityId.split(Const.RELATION_ID_PARSER_SPLIT);
+            if (parts.length != 2) {
+                throw new RuntimeException("Illegal Process Relation entity id");
+            }
+            return new ProcessRelationDefine(parts[0], parts[1]);
+        }
+
+        @RequiredArgsConstructor
+        @Getter
+        @EqualsAndHashCode
+        public static class ProcessRelationDefine {
+            private final String sourceId;
+            private final String destId;
+        }
     }
 
     /**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationClientSideMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationClientSideMetrics.java
new file mode 100644
index 0000000000..35f45b2bca
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationClientSideMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.process;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+@Stream(name = ProcessRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.PROCESS_RELATION,
+        builder = ProcessRelationClientSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
+@MetricsExtension(supportDownSampling = false, supportUpdate = true, timeRelativeID = true)
+@EqualsAndHashCode(of = {
+        "entityId"
+}, callSuper = true)
+public class ProcessRelationClientSideMetrics extends Metrics {
+
+    public static final String INDEX_NAME = "process_relation_client_side";
+    public static final String SERVICE_INSTANCE_ID = "service_instance_id";
+    public static final String SOURCE_PROCESS_ID = "source_process_id";
+    public static final String DEST_PROCESS_ID = "dest_process_id";
+
+    @Setter
+    @Getter
+    @Column(columnName = SERVICE_INSTANCE_ID)
+    private String serviceInstanceId;
+    @Setter
+    @Getter
+    @Column(columnName = SOURCE_PROCESS_ID)
+    private String sourceProcessId;
+    @Setter
+    @Getter
+    @Column(columnName = DEST_PROCESS_ID)
+    private String destProcessId;
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID, length = 512)
+    private String entityId;
+
+    @Override
+    protected String id0() {
+        return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+    }
+
+    @Override
+    public boolean combine(Metrics metrics) {
+        if (this.getTimeBucket() > metrics.getTimeBucket()) {
+            this.setTimeBucket(metrics.getTimeBucket());
+        }
+        return true;
+    }
+
+    @Override
+    public void calculate() {
+    }
+
+    @Override
+    public Metrics toHour() {
+        return null;
+    }
+
+    @Override
+    public Metrics toDay() {
+        return null;
+    }
+
+    @Override
+    public void deserialize(RemoteData remoteData) {
+        setServiceInstanceId(remoteData.getDataStrings(0));
+        setSourceProcessId(remoteData.getDataStrings(1));
+        setDestProcessId(remoteData.getDataStrings(2));
+        setEntityId(remoteData.getDataStrings(3));
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        final RemoteData.Builder builder = RemoteData.newBuilder();
+        builder.addDataStrings(getServiceInstanceId());
+        builder.addDataStrings(getSourceProcessId());
+        builder.addDataStrings(getDestProcessId());
+        builder.addDataStrings(getEntityId());
+        return builder;
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return this.entityId.hashCode();
+    }
+
+    public static class Builder implements StorageBuilder<ProcessRelationClientSideMetrics> {
+        @Override
+        public ProcessRelationClientSideMetrics storage2Entity(final Convert2Entity converter) {
+            ProcessRelationClientSideMetrics metrics = new ProcessRelationClientSideMetrics();
+            metrics.setServiceInstanceId((String) converter.get(SERVICE_INSTANCE_ID));
+            metrics.setSourceProcessId((String) converter.get(SOURCE_PROCESS_ID));
+            metrics.setDestProcessId((String) converter.get(DEST_PROCESS_ID));
+            metrics.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            metrics.setEntityId((String) converter.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public void entity2Storage(final ProcessRelationClientSideMetrics storageData,
+                                   final Convert2Storage converter) {
+            converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+            converter.accept(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+            converter.accept(SOURCE_PROCESS_ID, storageData.getSourceProcessId());
+            converter.accept(DEST_PROCESS_ID, storageData.getDestProcessId());
+            converter.accept(ENTITY_ID, storageData.getEntityId());
+        }
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationDispatcher.java
new file mode 100644
index 0000000000..ccf5ff34f6
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationDispatcher.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.process;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.ProcessRelation;
+
+public class ProcessRelationDispatcher implements SourceDispatcher<ProcessRelation> {
+    @Override
+    public void dispatch(ProcessRelation source) {
+        switch (source.getDetectPoint()) {
+            case SERVER:
+                processRelationServerSide(source);
+                break;
+            case CLIENT:
+                processRelationClientSide(source);
+                break;
+        }
+    }
+
+    private void processRelationServerSide(ProcessRelation relation) {
+        ProcessRelationServerSideMetrics metrics = new ProcessRelationServerSideMetrics();
+        metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+        metrics.setServiceInstanceId(relation.getInstanceId());
+        metrics.setSourceProcessId(relation.getSourceProcessId());
+        metrics.setDestProcessId(relation.getDestProcessId());
+        metrics.setEntityId(relation.getEntityId());
+        MetricsStreamProcessor.getInstance().in(metrics);
+    }
+
+    private void processRelationClientSide(ProcessRelation entity) {
+        ProcessRelationClientSideMetrics metrics = new ProcessRelationClientSideMetrics();
+        metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+        metrics.setServiceInstanceId(entity.getInstanceId());
+        metrics.setSourceProcessId(entity.getSourceProcessId());
+        metrics.setDestProcessId(entity.getDestProcessId());
+        metrics.setEntityId(entity.getEntityId());
+        MetricsStreamProcessor.getInstance().in(metrics);
+    }
+
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationServerSideMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationServerSideMetrics.java
new file mode 100644
index 0000000000..5dd9c880eb
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationServerSideMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.process;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+@Stream(name = ProcessRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.PROCESS_RELATION,
+        builder = ProcessRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
+@MetricsExtension(supportDownSampling = false, supportUpdate = true, timeRelativeID = true)
+@EqualsAndHashCode(of = {
+        "entityId"
+}, callSuper = true)
+public class ProcessRelationServerSideMetrics extends Metrics {
+
+    public static final String INDEX_NAME = "process_relation_server_side";
+    public static final String SERVICE_INSTANCE_ID = "service_instance_id";
+    public static final String SOURCE_PROCESS_ID = "source_process_id";
+    public static final String DEST_PROCESS_ID = "dest_process_id";
+
+    @Setter
+    @Getter
+    @Column(columnName = SERVICE_INSTANCE_ID)
+    private String serviceInstanceId;
+    @Setter
+    @Getter
+    @Column(columnName = SOURCE_PROCESS_ID)
+    private String sourceProcessId;
+    @Setter
+    @Getter
+    @Column(columnName = DEST_PROCESS_ID)
+    private String destProcessId;
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID, length = 512)
+    private String entityId;
+
+    @Override
+    protected String id0() {
+        return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+    }
+
+    @Override
+    public boolean combine(Metrics metrics) {
+        if (this.getTimeBucket() > metrics.getTimeBucket()) {
+            this.setTimeBucket(metrics.getTimeBucket());
+        }
+        return true;
+    }
+
+    @Override
+    public void calculate() {
+    }
+
+    @Override
+    public Metrics toHour() {
+        return null;
+    }
+
+    @Override
+    public Metrics toDay() {
+        return null;
+    }
+
+    @Override
+    public void deserialize(RemoteData remoteData) {
+        setServiceInstanceId(remoteData.getDataStrings(0));
+        setSourceProcessId(remoteData.getDataStrings(1));
+        setDestProcessId(remoteData.getDataStrings(2));
+        setEntityId(remoteData.getDataStrings(3));
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        final RemoteData.Builder builder = RemoteData.newBuilder();
+        builder.addDataStrings(getServiceInstanceId());
+        builder.addDataStrings(getSourceProcessId());
+        builder.addDataStrings(getDestProcessId());
+        builder.addDataStrings(getEntityId());
+        return builder;
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return this.entityId.hashCode();
+    }
+
+    public static class Builder implements StorageBuilder<ProcessRelationServerSideMetrics> {
+        @Override
+        public ProcessRelationServerSideMetrics storage2Entity(final Convert2Entity converter) {
+            ProcessRelationServerSideMetrics metrics = new ProcessRelationServerSideMetrics();
+            metrics.setServiceInstanceId((String) converter.get(SERVICE_INSTANCE_ID));
+            metrics.setSourceProcessId((String) converter.get(SOURCE_PROCESS_ID));
+            metrics.setDestProcessId((String) converter.get(DEST_PROCESS_ID));
+            metrics.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            metrics.setEntityId((String) converter.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public void entity2Storage(final ProcessRelationServerSideMetrics storageData,
+                                   final Convert2Storage converter) {
+            converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+            converter.accept(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+            converter.accept(SOURCE_PROCESS_ID, storageData.getSourceProcessId());
+            converter.accept(DEST_PROCESS_ID, storageData.getDestProcessId());
+            converter.accept(ENTITY_ID, storageData.getEntityId());
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
index 2c8b8a15cc..eb40eb1593 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
@@ -44,6 +44,8 @@ public class MeterEntity {
     private String endpointName;
     private String sourceServiceName;
     private String destServiceName;
+    private String sourceProcessId;
+    private String destProcessId;
     private DetectPoint detectPoint;
     private Layer layer;
 
@@ -66,6 +68,11 @@ public class MeterEntity {
                     sourceServiceId(),
                     destServiceId()
                 ));
+            case PROCESS_RELATION:
+                return IDManager.ProcessID.buildRelationId(new IDManager.ProcessID.ProcessRelationDefine(
+                    sourceProcessId,
+                    destProcessId
+                ));
             default:
                 throw new UnexpectedException("Unexpected scope type of entity " + this.toString());
         }
@@ -75,6 +82,10 @@ public class MeterEntity {
         return IDManager.ServiceID.buildId(serviceName, true);
     }
 
+    public String serviceInstanceId() {
+        return IDManager.ServiceInstanceID.buildId(serviceId(), instanceName);
+    }
+
     public String sourceServiceId() {
         return IDManager.ServiceID.buildId(sourceServiceName, true);
     }
@@ -138,4 +149,16 @@ public class MeterEntity {
         meterEntity.layer = layer;
         return meterEntity;
     }
+
+    public static MeterEntity newProcessRelation(String serviceName, String instanceName,
+                                                 String sourceProcessId, String destProcessId, DetectPoint detectPoint) {
+        final MeterEntity meterEntity = new MeterEntity();
+        meterEntity.scopeType = ScopeType.PROCESS_RELATION;
+        meterEntity.serviceName = serviceName;
+        meterEntity.instanceName = instanceName;
+        meterEntity.sourceProcessId = sourceProcessId;
+        meterEntity.destProcessId = destProcessId;
+        meterEntity.detectPoint = detectPoint;
+        return meterEntity;
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
index 3d0a4a26bd..4f89cc77c2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
@@ -25,7 +25,8 @@ public enum ScopeType {
     SERVICE(DefaultScopeDefine.SERVICE),
     SERVICE_INSTANCE(DefaultScopeDefine.SERVICE_INSTANCE),
     ENDPOINT(DefaultScopeDefine.ENDPOINT),
-    SERVICE_RELATION(DefaultScopeDefine.SERVICE_RELATION);
+    SERVICE_RELATION(DefaultScopeDefine.SERVICE_RELATION),
+    PROCESS_RELATION(DefaultScopeDefine.PROCESS_RELATION);
 
     @Getter
     private final int scopeId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 4c99e6902b..7e0eec94d7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -105,6 +105,7 @@ public class DefaultScopeDefine {
     public static final int ZIPKIN_SERVICE = 51;
     public static final int ZIPKIN_SERVICE_SPAN = 52;
     public static final int ZIPKIN_SERVICE_RELATION = 53;
+    public static final int PROCESS_RELATION = 54;
 
     /**
      * Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
@@ -116,6 +117,7 @@ public class DefaultScopeDefine {
     public static final String SERVICE_INSTANCE_RELATION_CATALOG_NAME = "SERVICE_INSTANCE_RELATION";
     public static final String ENDPOINT_RELATION_CATALOG_NAME = "ENDPOINT_RELATION";
     public static final String PROCESS_CATALOG_NAME = "PROCESS";
+    public static final String PROCESS_RELATION_CATALOG_NAME = "PROCESS_RELATION";
 
     private static final Map<Integer, Boolean> SERVICE_CATALOG = new HashMap<>();
     private static final Map<Integer, Boolean> SERVICE_INSTANCE_CATALOG = new HashMap<>();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ProcessRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ProcessRelation.java
new file mode 100644
index 0000000000..191858bd97
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ProcessRelation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS_RELATION;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS_RELATION_CATALOG_NAME;
+
+@ScopeDeclaration(id = PROCESS_RELATION, name = "ProcessRelation", catalog = PROCESS_RELATION_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class ProcessRelation extends Source {
+    private String entityId;
+
+    @Override
+    public int scope() {
+        return PROCESS_RELATION;
+    }
+
+    @Override
+    public String getEntityId() {
+        if (StringUtils.isEmpty(entityId)) {
+            entityId = IDManager.ProcessID.buildRelationId(
+                new IDManager.ProcessID.ProcessRelationDefine(
+                    sourceProcessId,
+                    destProcessId
+                )
+            );
+        }
+        return entityId;
+    }
+
+    @Getter
+    @Setter
+    private String instanceId;
+    @Getter
+    @Setter
+    private String sourceProcessId;
+    @Getter
+    @Setter
+    private String destProcessId;
+    @Getter
+    @Setter
+    private DetectPoint detectPoint;
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
index b167c5f730..c419d38d15 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
@@ -28,7 +28,9 @@ public class ZabbixConfig implements MetricRuleConfig {
 
     private String metricPrefix;
     private String expSuffix;
+    private String expPrefix;
     private String filter;
+    private String initExp;
     private Entities entities;
     private List<String> requiredZabbixItemKeys;
     private List<Metric> metrics;
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml
index 8310351653..a0d98f67ab 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml
@@ -28,6 +28,7 @@
 #    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
 #    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
 # </pre>
+initExp: Kubernetes.startMetadataListener()
 filter: "{ tags -> tags.job_name in [ 'kubernetes-cadvisor', 'kube-state-metrics' ] }" # The OpenTelemetry job name
 expSuffix: tag({tags -> tags.cluster = 'k8s-cluster::' + tags.cluster}).service(['cluster'], Layer.K8S)
 metricPrefix: k8s_cluster
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml
index 8d723fbd4b..28ff4696a9 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+initExp: Kubernetes.startMetadataListener()
 filter: "{ tags -> tags.job_name in [ 'kubernetes-cadvisor', 'kube-state-metrics' ] }" # The OpenTelemetry job name
 expSuffix: |-
   service(['cluster' , 'service'], '::', Layer.K8S_SERVICE)
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml
index 0bdcd5e400..9a72f81f74 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml
@@ -28,6 +28,7 @@
 #    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
 #    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
 # </pre>
+initExp: Kubernetes.startMetadataListener()
 filter: "{ tags -> tags.job_name in [ 'kubernetes-cadvisor', 'kube-state-metrics' ] }" # The OpenTelemetry job name
 expSuffix: service(['cluster' , 'service'], '::', Layer.K8S_SERVICE)
 metricPrefix: k8s_service