You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/07/04 12:59:40 UTC
[skywalking] branch master updated: Add `forEach`, `processRelation` function to MAL Expression, and add `expPrefix` and `initExp` in MAL config (#9299)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 7bb8c69c67 Add `forEach`, `processRelation` function to MAL Expression, and add `expPrefix` and `initExp` in MAL config (#9299)
7bb8c69c67 is described below
commit 7bb8c69c673df02202646e3a09cc6720e8d2db4d
Author: mrproliu <74...@qq.com>
AuthorDate: Mon Jul 4 20:59:34 2022 +0800
Add `forEach`, `processRelation` function to MAL Expression, and add `expPrefix` and `initExp` in MAL config (#9299)
---
docs/en/changes/changes.md | 2 +
docs/en/concepts-and-designs/mal.md | 6 +
docs/en/setup/backend/backend-meter.md | 4 +
docs/en/setup/backend/backend-zabbix.md | 4 +
docs/en/setup/backend/prometheus-metrics.md | 4 +
.../provider/meter/config/MeterConfig.java | 2 +
.../skywalking/oap/meter/analyzer/Analyzer.java | 74 ++++++++---
.../oap/meter/analyzer/MetricConvert.java | 42 +++++--
.../oap/meter/analyzer/MetricRuleConfig.java | 10 ++
.../skywalking/oap/meter/analyzer/dsl/DSL.java | 9 +-
.../ProcessRelationEntityDescription.java | 48 +++++++
.../analyzer/dsl/ExpressionParsingContext.java | 5 -
.../oap/meter/analyzer/dsl/SampleFamily.java | 48 ++++++-
.../analyzer/dsl/registry/ProcessRegistry.java | 79 ++++++++++++
.../oap/meter/analyzer/k8s/K8sInfoRegistry.java | 43 +++++++
.../oap/meter/analyzer/k8s/Kubernetes.java} | 22 ++--
.../oap/meter/analyzer/prometheus/rule/Rule.java | 2 +
.../oap/meter/analyzer/dsl/FunctionTest.java | 13 ++
.../oap/meter/analyzer/dsl/K8sTagTest.java | 69 +++++++++--
.../oap/meter/analyzer/dsl/ScopeTest.java | 40 ++++++
.../oap/server/core/analysis/IDManager.java | 26 ++++
.../process/ProcessRelationClientSideMetrics.java | 138 +++++++++++++++++++++
.../process/ProcessRelationDispatcher.java | 59 +++++++++
.../process/ProcessRelationServerSideMetrics.java | 138 +++++++++++++++++++++
.../server/core/analysis/meter/MeterEntity.java | 23 ++++
.../oap/server/core/analysis/meter/ScopeType.java | 3 +-
.../oap/server/core/source/DefaultScopeDefine.java | 2 +
.../oap/server/core/source/ProcessRelation.java | 64 ++++++++++
.../zabbix/provider/config/ZabbixConfig.java | 2 +
.../main/resources/otel-oc-rules/k8s-cluster.yaml | 1 +
.../main/resources/otel-oc-rules/k8s-instance.yaml | 1 +
.../main/resources/otel-oc-rules/k8s-service.yaml | 1 +
32 files changed, 928 insertions(+), 56 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 5a64514d71..db6ea1c71e 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -15,6 +15,8 @@
* Add component ID(128) for Java Hutool plugin.
* Add Zipkin query exception handler, response error message for illegal arguments.
* Fix a NullPointerException in the endpoint analysis, which would cause missing MQ-related `LocalSpan` in the trace.
+* Add `forEach`, `processRelation` function to MAL expression.
+* Add `expPrefix`, `initExp` in MAL config.
#### UI
diff --git a/docs/en/concepts-and-designs/mal.md b/docs/en/concepts-and-designs/mal.md
index 85d6d693b6..e92ceef7cb 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -211,6 +211,10 @@ Examples:
#### time
`time()`: Returns the number of seconds since January 1, 1970 UTC.
+#### foreach
+`forEach([string_array], Closure<Void> each)`: Iterates all samples according to the first array argument, and provide two parameters in the second closure argument:
+1. `element`: element in the array.
+2. `tags`: tags in each sample.
## Down Sampling Operation
MAL should instruct meter-system on how to downsample for metrics. It doesn't only refer to aggregate raw samples to
@@ -245,6 +249,8 @@ They extract level relevant labels from metric labels, then informs the meter-sy
extracts endpoint level labels from the second array argument, extracts layer from `Layer` argument.
- `serviceRelation(DetectPoint, [source_svc_label1...], [dest_svc_label1...], Layer)` DetectPoint including `DetectPoint.CLIENT` and `DetectPoint.SERVER`,
extracts `sourceService` labels from the first array argument, extracts `destService` labels from the second array argument, extracts layer from `Layer` argument.
+ - `processRelation(detect_point_label, [service_label1...], [instance_label1...], source_process_id_label, dest_process_id_label)` extracts `DetectPoint` labels from first argument, the label value should be `client` or `server`.
+ extracts `Service` labels from the first array argument, extracts `Instance` labels from the second array argument, extracts `ProcessID` labels from the fourth and fifth arguments of the source and destination.
## More Examples
diff --git a/docs/en/setup/backend/backend-meter.md b/docs/en/setup/backend/backend-meter.md
index 1e48ded8de..dee81764e7 100644
--- a/docs/en/setup/backend/backend-meter.md
+++ b/docs/en/setup/backend/backend-meter.md
@@ -81,8 +81,12 @@ If you're using Spring Sleuth, see [Spring Sleuth Setup](spring-sleuth-setup.md)
### Meters configuration
```yaml
+# initExp is the expression that initializes the current configuration file
+initExp: <string>
# filter the metrics, only those metrics that satisfy this condition will be passed into the `metricsRules` below.
filter: <closure> # example: '{ tags -> tags.job_name == "vm-monitoring" }'
+# expPrefix is executed before the metrics executes other functions.
+expPrefix: <string>
# expSuffix is appended to all expression in this file.
expSuffix: <string>
# insert metricPrefix into metric name: <metricPrefix>_<raw_metric_name>
diff --git a/docs/en/setup/backend/backend-zabbix.md b/docs/en/setup/backend/backend-zabbix.md
index 015ce8afb3..5a14f96ce8 100644
--- a/docs/en/setup/backend/backend-zabbix.md
+++ b/docs/en/setup/backend/backend-zabbix.md
@@ -30,8 +30,12 @@ You can find details on Zabbix agent items from [Zabbix Agent documentation](htt
### Configuration file
```yaml
+# initExp is the expression that initializes the current configuration file
+initExp: <string>
# insert metricPrefix into metric name: <metricPrefix>_<raw_metric_name>
metricPrefix: <string>
+# expPrefix is executed before the metrics executes other functions.
+expPrefix: <string>
# expSuffix is appended to all expression in this file.
expSuffix: <string>
# Datasource from Zabbix Item keys.
diff --git a/docs/en/setup/backend/prometheus-metrics.md b/docs/en/setup/backend/prometheus-metrics.md
index f14113c871..8fa784e542 100644
--- a/docs/en/setup/backend/prometheus-metrics.md
+++ b/docs/en/setup/backend/prometheus-metrics.md
@@ -37,8 +37,12 @@ staticConfig:
# Labels assigned to all metrics fetched from the targets.
labels:
[ <labelname>: <labelvalue> ... ]
+# initExp is the expression that initializes the current configuration file
+initExp: <string>
# filter the metrics, only those metrics that satisfy this condition will be passed into the `metricsRules` below.
filter: <closure> # example: '{ tags -> tags.job_name == "vm-monitoring" }'
+# expPrefix is executed before the metrics executes other functions.
+expPrefix: <string>
# expSuffix is appended to all expression in this file.
expSuffix: <string>
# insert metricPrefix into metric name: <metricPrefix>_<raw_metric_name>
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
index c2e8930302..4bc3bec8c7 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
@@ -29,8 +29,10 @@ import java.util.List;
public class MeterConfig implements MetricRuleConfig {
private String metricPrefix;
private String expSuffix;
+ private String expPrefix;
private String filter;
private List<Rule> metricsRules;
+ private String initExp;
@Data
@NoArgsConstructor
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 05b01868cf..28c2c487f4 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -42,11 +42,12 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.FilterExpression;
import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
-import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationClientSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
@@ -241,10 +242,6 @@ public class Analyzer {
}
}
createMetric(ctx.getScopeType(), metricType.literal, ctx.getDownsampling());
-
- if (ctx.isRetagByK8sMeta()) {
- K8sInfoRegistry.getInstance().start();
- }
}
private void createMetric(final ScopeType scopeType,
@@ -262,16 +259,12 @@ public class Analyzer {
private void generateTraffic(MeterEntity entity) {
if (entity.getDetectPoint() != null) {
- switch (entity.getDetectPoint()) {
- case SERVER:
- entity.setServiceName(entity.getDestServiceName());
- toService(requireNonNull(entity.getDestServiceName()), entity.getLayer());
- serverSide(entity);
+ switch (entity.getScopeType()) {
+ case SERVICE_RELATION:
+ serviceRelationTraffic(entity);
break;
- case CLIENT:
- entity.setServiceName(entity.getSourceServiceName());
- toService(requireNonNull(entity.getSourceServiceName()), entity.getLayer());
- clientSide(entity);
+ case PROCESS_RELATION:
+ processRelationTraffic(entity);
break;
default:
}
@@ -309,7 +302,23 @@ public class Analyzer {
MetricsStreamProcessor.getInstance().in(s);
}
- private void serverSide(MeterEntity entity) {
+ private void serviceRelationTraffic(MeterEntity entity) {
+ switch (entity.getDetectPoint()) {
+ case SERVER:
+ entity.setServiceName(entity.getDestServiceName());
+ toService(requireNonNull(entity.getDestServiceName()), entity.getLayer());
+ serviceRelationServerSide(entity);
+ break;
+ case CLIENT:
+ entity.setServiceName(entity.getSourceServiceName());
+ toService(requireNonNull(entity.getSourceServiceName()), entity.getLayer());
+ serviceRelationClientSide(entity);
+ break;
+ default:
+ }
+ }
+
+ private void serviceRelationServerSide(MeterEntity entity) {
ServiceRelationServerSideMetrics metrics = new ServiceRelationServerSideMetrics();
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setSourceServiceId(entity.sourceServiceId());
@@ -319,7 +328,7 @@ public class Analyzer {
MetricsStreamProcessor.getInstance().in(metrics);
}
- private void clientSide(MeterEntity entity) {
+ private void serviceRelationClientSide(MeterEntity entity) {
ServiceRelationClientSideMetrics metrics = new ServiceRelationClientSideMetrics();
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setSourceServiceId(entity.sourceServiceId());
@@ -328,4 +337,37 @@ public class Analyzer {
metrics.setEntityId(entity.id());
MetricsStreamProcessor.getInstance().in(metrics);
}
+
+ private void processRelationTraffic(MeterEntity entity) {
+ switch (entity.getDetectPoint()) {
+ case SERVER:
+ processRelationServerSide(entity);
+ break;
+ case CLIENT:
+ processRelationClientSide(entity);
+ break;
+ default:
+ }
+ }
+
+ private void processRelationServerSide(MeterEntity entity) {
+ ProcessRelationServerSideMetrics metrics = new ProcessRelationServerSideMetrics();
+ metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ metrics.setServiceInstanceId(entity.serviceInstanceId());
+ metrics.setSourceProcessId(entity.getSourceProcessId());
+ metrics.setDestProcessId(entity.getDestProcessId());
+ metrics.setEntityId(entity.id());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+
+ private void processRelationClientSide(MeterEntity entity) {
+ ProcessRelationClientSideMetrics metrics = new ProcessRelationClientSideMetrics();
+ metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ metrics.setServiceInstanceId(entity.serviceInstanceId());
+ metrics.setSourceProcessId(entity.getSourceProcessId());
+ metrics.setDestProcessId(entity.getDestProcessId());
+ metrics.setEntityId(entity.id());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+
}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
index 07ff140a1e..54a3e79261 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
@@ -26,6 +26,11 @@ import java.util.List;
import java.util.StringJoiner;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
+import org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingException;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
@@ -48,14 +53,28 @@ public class MetricConvert {
public MetricConvert(MetricRuleConfig rule, MeterSystem service) {
Preconditions.checkState(!Strings.isNullOrEmpty(rule.getMetricPrefix()));
+ // init expression script
+ if (StringUtils.isNotEmpty(rule.getInitExp())) {
+ handleInitExp(rule.getInitExp());
+ }
+
this.analyzers = rule.getMetricsRules().stream().map(
- r -> Analyzer.build(
- formatMetricName(rule, r.getName()),
- rule.getFilter(),
- Strings.isNullOrEmpty(rule.getExpSuffix()) ?
- r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
- service
- )
+ r -> {
+ String exp = r.getExp();
+ if (!Strings.isNullOrEmpty(rule.getExpPrefix())) {
+ exp = String.format("(%s.%s).%s", StringUtils.substringBefore(exp, "."), rule.getExpPrefix(),
+ StringUtils.substringAfter(exp, "."));
+ }
+ if (!Strings.isNullOrEmpty(rule.getExpSuffix())) {
+ exp = String.format("(%s).%s", exp, rule.getExpSuffix());
+ }
+ return Analyzer.build(
+ formatMetricName(rule, r.getName()),
+ rule.getFilter(),
+ exp,
+ service
+ );
+ }
).collect(toList());
}
@@ -83,4 +102,13 @@ public class MetricConvert {
metricName.add(rule.getMetricPrefix()).add(meterRuleName);
return metricName.toString();
}
+
+ private void handleInitExp(String exp) {
+ Expression e = DSL.parse(exp);
+ final Result result = e.run(ImmutableMap.of());
+ if (!result.isSuccess() && result.isThrowable()) {
+ throw new ExpressionParsingException(
+ "failed to execute init expression: " + exp + ", error:" + result.getError());
+ }
+ }
}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
index 831f4c8a87..2e75b1d89c 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
@@ -35,6 +35,11 @@ public interface MetricRuleConfig {
*/
String getExpSuffix();
+ /**
+ * Get MAL expression prefix
+ */
+ String getExpPrefix();
+
/**
* Get all rules
*/
@@ -42,6 +47,11 @@ public interface MetricRuleConfig {
String getFilter();
+ /**
+ * Get the init expression script
+ */
+ String getInitExp();
+
interface RuleConfig {
/**
* Get definition metrics name
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
index ee0b9debdd..d4f0415c03 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
@@ -25,7 +25,10 @@ import groovy.util.DelegatingScript;
import java.lang.reflect.Array;
import java.util.List;
import java.util.Map;
+
+import org.apache.skywalking.oap.meter.analyzer.dsl.registry.ProcessRegistry;
import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
+import org.apache.skywalking.oap.meter.analyzer.k8s.Kubernetes;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.codehaus.groovy.ast.stmt.DoWhileStatement;
@@ -54,6 +57,8 @@ public final class DSL {
icz.addImport("K8sRetagType", K8sRetagType.class.getName());
icz.addImport("DetectPoint", DetectPoint.class.getName());
icz.addImport("Layer", Layer.class.getName());
+ icz.addImport("ProcessRegistry", ProcessRegistry.class.getName());
+ icz.addImport("Kubernetes", Kubernetes.class.getName());
cc.addCompilationCustomizers(icz);
final SecureASTCustomizer secureASTCustomizer = new SecureASTCustomizer();
@@ -73,7 +78,9 @@ public final class DSL {
.add(K8sRetagType.class)
.add(DetectPoint.class)
.add(Layer.class)
- .build());
+ .add(ProcessRegistry.class)
+ .add(Kubernetes.class)
+ .build());
cc.addCompilationCustomizers(secureASTCustomizer);
GroovyShell sh = new GroovyShell(new Binding(), cc);
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/EntityDescription/ProcessRelationEntityDescription.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/EntityDescription/ProcessRelationEntityDescription.java
new file mode 100644
index 0000000000..8851bbf9b5
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/EntityDescription/ProcessRelationEntityDescription.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription;
+
+import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
+
+import java.util.List;
+
+@Getter
+@RequiredArgsConstructor
+@ToString
+public class ProcessRelationEntityDescription implements EntityDescription {
+ private final ScopeType scopeType = ScopeType.PROCESS_RELATION;
+ private final List<String> serviceKeys;
+ private final List<String> instanceKeys;
+ private final String sourceProcessIdKey;
+ private final String destProcessIdKey;
+ private final String detectPointKey;
+ private final String delimiter;
+
+ @Override
+ public List<String> getLabelKeys() {
+ return ImmutableList.<String>builder()
+ .addAll(serviceKeys)
+ .addAll(instanceKeys)
+ .add(detectPointKey, sourceProcessIdKey, destProcessIdKey).build();
+ }
+}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
index f571ccae39..a09eaedb1f 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
@@ -71,11 +71,6 @@ public class ExpressionParsingContext implements Closeable {
ScopeType scopeType;
- /**
- * Mark whether the retagByK8sMeta func in expressions is active
- */
- boolean isRetagByK8sMeta;
-
/**
* Get labels no scope related.
*
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index 93f88cc6d1..acc7e9c489 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -23,9 +23,12 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EndpointEntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.InstanceEntityDescription;
+import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ProcessRelationEntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ServiceEntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ServiceRelationEntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
@@ -36,6 +39,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -358,7 +362,6 @@ public class SampleFamily {
Preconditions.checkArgument(!Strings.isNullOrEmpty(newLabelName));
Preconditions.checkArgument(!Strings.isNullOrEmpty(existingLabelName));
Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceLabelName));
- ExpressionParsingContext.get().ifPresent(ctx -> ctx.isRetagByK8sMeta = true);
if (this == EMPTY) {
return EMPTY;
}
@@ -497,6 +500,38 @@ public class SampleFamily {
return createMeterSamples(new ServiceRelationEntityDescription(sourceServiceKeys, destServiceKeys, detectPoint, layer, Const.POINT));
}
+ public SampleFamily forEach(List<String> array, Closure<Void> each) {
+ if (this == EMPTY) {
+ return EMPTY;
+ }
+ return SampleFamily.build(this.context, Arrays.stream(this.samples).map(sample -> {
+ Map<String, String> labels = Maps.newHashMap(sample.getLabels());
+ for (String element : array) {
+ each.call(element, labels);
+ }
+ return sample.toBuilder().labels(ImmutableMap.copyOf(labels)).build();
+ }).toArray(Sample[]::new));
+ }
+
+ public SampleFamily processRelation(String detectPointKey, List<String> serviceKeys, List<String> instanceKeys, String sourceProcessIdKey, String destProcessIdKey) {
+ Preconditions.checkArgument(serviceKeys.size() > 0);
+ Preconditions.checkArgument(instanceKeys.size() > 0);
+ Preconditions.checkArgument(StringUtil.isNotEmpty(sourceProcessIdKey));
+ Preconditions.checkArgument(StringUtil.isNotEmpty(destProcessIdKey));
+ ExpressionParsingContext.get().ifPresent(ctx -> {
+ ctx.scopeType = ScopeType.PROCESS_RELATION;
+ ctx.scopeLabels.addAll(serviceKeys);
+ ctx.scopeLabels.addAll(instanceKeys);
+ ctx.scopeLabels.add(detectPointKey);
+ ctx.scopeLabels.add(sourceProcessIdKey);
+ ctx.scopeLabels.add(destProcessIdKey);
+ });
+ if (this == EMPTY) {
+ return EMPTY;
+ }
+ return createMeterSamples(new ProcessRelationEntityDescription(serviceKeys, instanceKeys, sourceProcessIdKey, destProcessIdKey, detectPointKey, Const.POINT));
+ }
+
private SampleFamily createMeterSamples(EntityDescription entityDescription) {
Map<MeterEntity, Sample[]> meterSamples = new HashMap<>();
Arrays.stream(samples)
@@ -651,6 +686,17 @@ public class SampleFamily {
InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys(), serviceRelationEntityDescription.getDelimiter()),
serviceRelationEntityDescription.getDetectPoint(), serviceRelationEntityDescription.getLayer()
);
+ case PROCESS_RELATION:
+ final ProcessRelationEntityDescription processRelationEntityDescription = (ProcessRelationEntityDescription) entityDescription;
+ final String detectPointValue = InternalOps.dim(samples, Collections.singletonList(processRelationEntityDescription.getDetectPointKey()), processRelationEntityDescription.getDelimiter());
+ DetectPoint point = StringUtils.equalsAnyIgnoreCase(detectPointValue, "server") ? DetectPoint.SERVER : DetectPoint.CLIENT;
+ return MeterEntity.newProcessRelation(
+ InternalOps.dim(samples, processRelationEntityDescription.getServiceKeys(), processRelationEntityDescription.getDelimiter()),
+ InternalOps.dim(samples, processRelationEntityDescription.getInstanceKeys(), processRelationEntityDescription.getDelimiter()),
+ InternalOps.dim(samples, Collections.singletonList(processRelationEntityDescription.getSourceProcessIdKey()), processRelationEntityDescription.getDelimiter()),
+ InternalOps.dim(samples, Collections.singletonList(processRelationEntityDescription.getDestProcessIdKey()), processRelationEntityDescription.getDelimiter()),
+ point
+ );
default:
throw new UnexpectedException(
"Unexpected scope type of entityDescription " + entityDescription);
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/registry/ProcessRegistry.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/registry/ProcessRegistry.java
new file mode 100644
index 0000000000..5298fa9b6c
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/registry/ProcessRegistry.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.registry;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+
+/**
+ * The dynamic entity registry for {@link ProcessTraffic}
+ */
+public class ProcessRegistry {
+
+ public static final String LOCAL_VIRTUAL_PROCESS = "UNKNOWN_LOCAL";
+ public static final String REMOTE_VIRTUAL_PROCESS = "UNKNOWN_REMOTE";
+
+ /**
+ * Generate virtual local process under the instance
+ * @return the process id
+ */
+ public static String generateVirtualLocalProcess(String service, String instance) {
+ return generateVirtualProcess(service, instance, LOCAL_VIRTUAL_PROCESS);
+ }
+
+ /**
+ * Generate virtual remote process under the instance
+ * trying to generate the name in the kubernetes environment through the remote address
+ * @return the process id
+ */
+ public static String generateVirtualRemoteProcess(String service, String instance, String remoteAddress) {
+ // remove port
+ String ip = StringUtils.substringBeforeLast(remoteAddress, ":");
+
+ // find remote through k8s metadata
+ String name = K8sInfoRegistry.getInstance().findPodByIP(ip);
+ if (StringUtils.isEmpty(name)) {
+ name = K8sInfoRegistry.getInstance().findServiceByIP(ip);
+ }
+ // if not exists, then just use remote unknown
+ if (StringUtils.isEmpty(name)) {
+ name = REMOTE_VIRTUAL_PROCESS;
+ }
+
+ return generateVirtualProcess(service, instance, name);
+ }
+
+ private static String generateVirtualProcess(String service, String instance, String processName) {
+ final ProcessTraffic traffic = new ProcessTraffic();
+ final String serviceId = IDManager.ServiceID.buildId(service, true);
+ traffic.setServiceId(serviceId);
+ traffic.setInstanceId(IDManager.ServiceInstanceID.buildId(serviceId, instance));
+ traffic.setName(processName);
+ traffic.setDetectType(ProcessDetectType.VIRTUAL.value());
+ traffic.setTimeBucket(TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
+ MetricsStreamProcessor.getInstance().in(traffic);
+ return traffic.id();
+ }
+}
\ No newline at end of file
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
index eb318bdd8b..d44bb84a39 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
@@ -24,13 +24,17 @@ import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1LoadBalancerIngress;
+import io.kubernetes.client.openapi.models.V1LoadBalancerStatus;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
+import io.kubernetes.client.openapi.models.V1ServiceStatus;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.models.V1Pod;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,6 +44,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
@@ -52,6 +58,8 @@ public class K8sInfoRegistry {
private final Map<String/* podName.namespace */, V1Pod> namePodMap = new ConcurrentHashMap<>();
protected final Map<String/* serviceName.namespace */, V1Service> nameServiceMap = new ConcurrentHashMap<>();
private final Map<String/* podName.namespace */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
+ private final Map<String/* podIP */, String /* podName.namespace */> ipPodMap = new ConcurrentHashMap<>();
+ private final Map<String/* serviceIP */, String /* serviceName.namespace */> ipServiceMap = new ConcurrentHashMap<>();
private ExecutorService executor;
private static final String SEPARATOR = ".";
@@ -168,6 +176,10 @@ public class K8sInfoRegistry {
ofNullable(service.getMetadata()).ifPresent(
metadata -> nameServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace())
);
+ ofNullable(service.getStatus()).map(V1ServiceStatus::getLoadBalancer).filter(Objects::nonNull)
+ .map(V1LoadBalancerStatus::getIngress).filter(CollectionUtils::isNotEmpty)
+ .ifPresent(l -> l.stream().filter(i -> StringUtil.isNotEmpty(i.getIp()))
+ .forEach(i -> ipServiceMap.remove(i.getIp())));
recompose();
}
@@ -184,10 +196,18 @@ public class K8sInfoRegistry {
ofNullable(pod.getMetadata()).ifPresent(
metadata -> podServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
+
+ ofNullable(pod.getStatus()).filter(s -> StringUtil.isNotEmpty(s.getPodIP())).ifPresent(
+ status -> ipPodMap.remove(status.getPodIP()));
}
private void recompose() {
namePodMap.forEach((podName, pod) -> {
+ if (!isNull(pod.getMetadata())) {
+ ofNullable(pod.getStatus()).filter(s -> StringUtil.isNotEmpty(s.getPodIP())).ifPresent(
+ status -> ipPodMap.put(status.getPodIP(), podName));
+ }
+
nameServiceMap.forEach((serviceName, service) -> {
if (isNull(pod.getMetadata()) || isNull(service.getMetadata()) || isNull(service.getSpec())) {
return;
@@ -213,12 +233,35 @@ public class K8sInfoRegistry {
}
});
});
+ nameServiceMap.forEach((serviceName, service) -> {
+ if (isNull(service.getMetadata()) || isNull(service.getStatus()) || isNull(service.getStatus().getLoadBalancer())) {
+ return;
+ }
+ final List<V1LoadBalancerIngress> ingress = service.getStatus().getLoadBalancer().getIngress();
+ if (CollectionUtils.isEmpty(ingress)) {
+ return;
+ }
+
+ for (V1LoadBalancerIngress loadBalancerIngress : ingress) {
+ if (StringUtil.isNotEmpty(loadBalancerIngress.getIp())) {
+ ipServiceMap.put(loadBalancerIngress.getIp(), serviceName);
+ }
+ }
+ });
}
public String findServiceName(String namespace, String podName) {
return this.podServiceMap.get(podName + SEPARATOR + namespace);
}
+ public String findPodByIP(String ip) {
+ return this.ipPodMap.get(ip);
+ }
+
+ public String findServiceByIP(String ip) {
+ return this.ipServiceMap.get(ip);
+ }
+
private boolean hasIntersection(Collection<?> o, Collection<?> c) {
Objects.requireNonNull(o);
Objects.requireNonNull(c);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/Kubernetes.java
similarity index 62%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
copy to oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/Kubernetes.java
index 3d0a4a26bd..772be04409 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/Kubernetes.java
@@ -16,21 +16,13 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.meter;
+package org.apache.skywalking.oap.meter.analyzer.k8s;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-
-public enum ScopeType {
- SERVICE(DefaultScopeDefine.SERVICE),
- SERVICE_INSTANCE(DefaultScopeDefine.SERVICE_INSTANCE),
- ENDPOINT(DefaultScopeDefine.ENDPOINT),
- SERVICE_RELATION(DefaultScopeDefine.SERVICE_RELATION);
-
- @Getter
- private final int scopeId;
-
- ScopeType(final int scopeId) {
- this.scopeId = scopeId;
+public class Kubernetes {
+ /**
+ * Start the listen the kubernetes metadata
+ */
+ public static void startMetadataListener() {
+ K8sInfoRegistry.getInstance().start();
}
}
\ No newline at end of file
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
index ea4e78ab9e..68d6b0a5b9 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
@@ -37,6 +37,8 @@ public class Rule implements MetricRuleConfig {
private StaticConfig staticConfig;
private String metricPrefix;
private String expSuffix;
+ private String expPrefix;
private String filter;
+ private String initExp;
private List<MetricsRule> metricsRules;
}
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java
index a1a28a7c55..e9f72cbc4a 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FunctionTest.java
@@ -123,6 +123,19 @@ public class FunctionTest {
),
false,
},
+ {
+ "for-each",
+ of("http_success_request", SampleFamilyBuilder.newBuilder(
+ Sample.builder().labels(of("region", "us")).name("http_success_request").build(),
+ Sample.builder().labels(of("region", "cn")).name("http_success_request").build()
+ ).build()),
+ "http_success_request.forEach(['v1', 'v2'], {element, tags -> tags[element] = 'test'})",
+ Result.success(SampleFamilyBuilder.newBuilder(
+ Sample.builder().labels(of("region", "us", "v1", "test", "v2", "test")).name("http_success_request").build(),
+ Sample.builder().labels(of("region", "cn", "v1", "test", "v2", "test")).name("http_success_request").build()
+ ).build()),
+ false,
+ },
});
}
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
index 199d8d9945..caa39ac8d0 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
@@ -19,17 +19,23 @@
package org.apache.skywalking.oap.meter.analyzer.dsl;
import com.google.common.collect.ImmutableMap;
+import io.kubernetes.client.openapi.models.V1LoadBalancerIngress;
+import io.kubernetes.client.openapi.models.V1LoadBalancerStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodStatus;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+
+import io.kubernetes.client.openapi.models.V1ServiceStatus;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.Retag;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -204,6 +210,37 @@ public class K8sTagTest {
).build()),
false,
},
+ {
+ "IPAddress_to_name",
+ of("rover_network_profiling_process_write_bytes", SampleFamilyBuilder.newBuilder(
+ Sample.builder()
+ .labels(
+ of("service", "test", "instance", "test-instance", "side", "client",
+ "client_address", "1.1.1.1", "server_address", "2.2.2.2")
+ )
+ .value(2)
+ .name("rover_network_profiling_process_write_bytes")
+ .build()
+ ).build()),
+ "rover_network_profiling_process_write_bytes.forEach(['client', 'server'] , " +
+ "{prefix, tags -> tags[prefix + '_process_id'] = ProcessRegistry.generateVirtualRemoteProcess(tags.service, tags.instance, tags[prefix + '_address'])})",
+ Result.success(SampleFamilyBuilder.newBuilder(
+ Sample.builder()
+ .labels(
+ of("service", "test", "instance", "test-instance", "side", "client",
+ "client_address", "1.1.1.1", "client_process_id", IDManager.ProcessID.buildId(
+ IDManager.ServiceInstanceID.buildId(IDManager.ServiceID.buildId("test", true), "test-instance"),
+ "my-nginx-5dc4865748-mbczh.default"),
+ "server_address", "2.2.2.2", "server_process_id", IDManager.ProcessID.buildId(
+ IDManager.ServiceInstanceID.buildId(IDManager.ServiceID.buildId("test", true), "test-instance"),
+ "kube-state-metrics.kube-system"))
+ )
+ .value(2)
+ .name("rover_network_profiling_process_write_bytes")
+ .build()
+ ).build()),
+ false,
+ }
});
}
@@ -213,36 +250,37 @@ public class K8sTagTest {
Whitebox.setInternalState(K8sInfoRegistry.class, "INSTANCE",
Mockito.spy(K8sInfoRegistry.getInstance())
);
+ PowerMockito.doNothing().when(K8sInfoRegistry.getInstance(), "start");
PowerMockito.when(
- K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
+ K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"))
.thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addService",
- mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"))
+ mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"), "2.2.2.2")
).thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addPod",
- mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+ mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1")
).thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addPod",
- mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"))
+ mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"), "1.1.1.2")
).thenCallRealMethod();
PowerMockito.when(
- K8sInfoRegistry.getInstance(), "removeService", mockService("nginx-service", "default", of("run", "nginx")))
+ K8sInfoRegistry.getInstance(), "removeService", mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"))
.thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "removePod",
- mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+ mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1")
).thenCallRealMethod();
PowerMockito.when(
- K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
+ K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"))
.thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addPod",
- mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+ mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1")
).thenCallRealMethod();
}
@@ -266,7 +304,7 @@ public class K8sTagTest {
assertThat(r, is(want));
}
- private V1Service mockService(String name, String namespace, Map<String, String> selector) {
+ private V1Service mockService(String name, String namespace, Map<String, String> selector, String ipAddress) {
V1Service service = new V1Service();
V1ObjectMeta serviceMeta = new V1ObjectMeta();
V1ServiceSpec v1ServiceSpec = new V1ServiceSpec();
@@ -277,15 +315,26 @@ public class K8sTagTest {
v1ServiceSpec.setSelector(selector);
service.setSpec(v1ServiceSpec);
+ final V1ServiceStatus v1ServiceStatus = new V1ServiceStatus();
+ final V1LoadBalancerStatus balancerStatus = new V1LoadBalancerStatus();
+ final V1LoadBalancerIngress loadBalancerIngress = new V1LoadBalancerIngress();
+ loadBalancerIngress.setIp(ipAddress);
+ balancerStatus.setIngress(Arrays.asList(loadBalancerIngress));
+ v1ServiceStatus.setLoadBalancer(balancerStatus);
+ service.setStatus(v1ServiceStatus);
+
return service;
}
- private V1Pod mockPod(String name, String namespace, Map<String, String> labels) {
+ private V1Pod mockPod(String name, String namespace, Map<String, String> labels, String ipAddress) {
V1Pod v1Pod = new V1Pod();
V1ObjectMeta podMeta = new V1ObjectMeta();
podMeta.setName(name);
podMeta.setNamespace(namespace);
podMeta.setLabels(labels);
+ final V1PodStatus status = new V1PodStatus();
+ status.setPodIP(ipAddress);
+ v1Pod.setStatus(status);
v1Pod.setMetadata(podMeta);
return v1Pod;
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java
index 0cd95c7ab4..24882ecaa8 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ScopeTest.java
@@ -532,6 +532,46 @@ public class ScopeTest {
);
}
}
+ },
+ {
+ "sum_process_relation",
+ of("rover_network_profiling_process_write_bytes", SampleFamilyBuilder.newBuilder(
+ Sample.builder()
+ .labels(of("service", "test", "instance", "test-instance", "side", "server", "client_process_id", "abc", "server_process_id", "def"))
+ .value(11)
+ .name("rover_network_profiling_process_write_bytes")
+ .build(),
+ Sample.builder()
+ .labels(of("service", "test", "instance", "test-instance", "side", "client", "client_process_id", "abc", "server_process_id", "def"))
+ .value(12)
+ .name("rover_network_profiling_process_write_bytes")
+ .build()
+ ).build()),
+ "rover_network_profiling_process_write_bytes.sum(['service' ,'instance', 'side', 'client_process_id', 'server_process_id'])" +
+ ".processRelation('side', ['service'], ['instance'], 'client_process_id', 'server_process_id')",
+ false,
+ new HashMap<MeterEntity, Sample[]>() {
+ {
+ put(
+ MeterEntity.newProcessRelation("test", "test-instance", "abc", "def", DetectPoint.SERVER),
+ new Sample[] {
+ Sample.builder()
+ .labels(of())
+ .value(11)
+ .name("rover_network_profiling_process_write_bytes").build()
+ }
+ );
+ put(
+ MeterEntity.newProcessRelation("test", "test-instance", "abc", "def", DetectPoint.CLIENT),
+ new Sample[] {
+ Sample.builder()
+ .labels(of())
+ .value(12)
+ .name("rover_network_profiling_process_write_bytes").build()
+ }
+ );
+ }
+ }
}
});
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
index dc09772c56..cf67d0eb62 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
@@ -268,6 +268,32 @@ public class IDManager {
return Hashing.sha256().newHasher().putString(String.format("%s_%s",
name, instanceId), Charsets.UTF_8).hash().toString();
}
+
+ /**
+ * @return encoded process relation id
+ */
+ public static String buildRelationId(ProcessRelationDefine define) {
+ return define.sourceId + Const.RELATION_ID_CONNECTOR + define.destId;
+ }
+
+ /**
+ * @return process relation ID object decoded from {@link #buildRelationId(ProcessRelationDefine)} result
+ */
+ public static ProcessRelationDefine analysisRelationId(String entityId) {
+ String[] parts = entityId.split(Const.RELATION_ID_PARSER_SPLIT);
+ if (parts.length != 2) {
+ throw new RuntimeException("Illegal Process Relation entity id");
+ }
+ return new ProcessRelationDefine(parts[0], parts[1]);
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ @EqualsAndHashCode
+ public static class ProcessRelationDefine {
+ private final String sourceId;
+ private final String destId;
+ }
}
/**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationClientSideMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationClientSideMetrics.java
new file mode 100644
index 0000000000..35f45b2bca
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationClientSideMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.process;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+@Stream(name = ProcessRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.PROCESS_RELATION,
+ builder = ProcessRelationClientSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
+@MetricsExtension(supportDownSampling = false, supportUpdate = true, timeRelativeID = true)
+@EqualsAndHashCode(of = {
+ "entityId"
+}, callSuper = true)
+public class ProcessRelationClientSideMetrics extends Metrics {
+
+ public static final String INDEX_NAME = "process_relation_client_side";
+ public static final String SERVICE_INSTANCE_ID = "service_instance_id";
+ public static final String SOURCE_PROCESS_ID = "source_process_id";
+ public static final String DEST_PROCESS_ID = "dest_process_id";
+
+ @Setter
+ @Getter
+ @Column(columnName = SERVICE_INSTANCE_ID)
+ private String serviceInstanceId;
+ @Setter
+ @Getter
+ @Column(columnName = SOURCE_PROCESS_ID)
+ private String sourceProcessId;
+ @Setter
+ @Getter
+ @Column(columnName = DEST_PROCESS_ID)
+ private String destProcessId;
+ @Setter
+ @Getter
+ @Column(columnName = ENTITY_ID, length = 512)
+ private String entityId;
+
+ @Override
+ protected String id0() {
+ return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+ }
+
+ @Override
+ public boolean combine(Metrics metrics) {
+ if (this.getTimeBucket() > metrics.getTimeBucket()) {
+ this.setTimeBucket(metrics.getTimeBucket());
+ }
+ return true;
+ }
+
+ @Override
+ public void calculate() {
+ }
+
+ @Override
+ public Metrics toHour() {
+ return null;
+ }
+
+ @Override
+ public Metrics toDay() {
+ return null;
+ }
+
+ @Override
+ public void deserialize(RemoteData remoteData) {
+ setServiceInstanceId(remoteData.getDataStrings(0));
+ setSourceProcessId(remoteData.getDataStrings(1));
+ setDestProcessId(remoteData.getDataStrings(2));
+ setEntityId(remoteData.getDataStrings(3));
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ final RemoteData.Builder builder = RemoteData.newBuilder();
+ builder.addDataStrings(getServiceInstanceId());
+ builder.addDataStrings(getSourceProcessId());
+ builder.addDataStrings(getDestProcessId());
+ builder.addDataStrings(getEntityId());
+ return builder;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return this.entityId.hashCode();
+ }
+
+ public static class Builder implements StorageBuilder<ProcessRelationClientSideMetrics> {
+ @Override
+ public ProcessRelationClientSideMetrics storage2Entity(final Convert2Entity converter) {
+ ProcessRelationClientSideMetrics metrics = new ProcessRelationClientSideMetrics();
+ metrics.setServiceInstanceId((String) converter.get(SERVICE_INSTANCE_ID));
+ metrics.setSourceProcessId((String) converter.get(SOURCE_PROCESS_ID));
+ metrics.setDestProcessId((String) converter.get(DEST_PROCESS_ID));
+ metrics.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ metrics.setEntityId((String) converter.get(ENTITY_ID));
+ return metrics;
+ }
+
+ @Override
+ public void entity2Storage(final ProcessRelationClientSideMetrics storageData,
+ final Convert2Storage converter) {
+ converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ converter.accept(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+ converter.accept(SOURCE_PROCESS_ID, storageData.getSourceProcessId());
+ converter.accept(DEST_PROCESS_ID, storageData.getDestProcessId());
+ converter.accept(ENTITY_ID, storageData.getEntityId());
+ }
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationDispatcher.java
new file mode 100644
index 0000000000..ccf5ff34f6
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationDispatcher.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.process;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.ProcessRelation;
+
+public class ProcessRelationDispatcher implements SourceDispatcher<ProcessRelation> {
+ @Override
+ public void dispatch(ProcessRelation source) {
+ switch (source.getDetectPoint()) {
+ case SERVER:
+ processRelationServerSide(source);
+ break;
+ case CLIENT:
+ processRelationClientSide(source);
+ break;
+ }
+ }
+
+ private void processRelationServerSide(ProcessRelation relation) {
+ ProcessRelationServerSideMetrics metrics = new ProcessRelationServerSideMetrics();
+ metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ metrics.setServiceInstanceId(relation.getInstanceId());
+ metrics.setSourceProcessId(relation.getSourceProcessId());
+ metrics.setDestProcessId(relation.getDestProcessId());
+ metrics.setEntityId(relation.getEntityId());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+
+ private void processRelationClientSide(ProcessRelation entity) {
+ ProcessRelationClientSideMetrics metrics = new ProcessRelationClientSideMetrics();
+ metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ metrics.setServiceInstanceId(entity.getInstanceId());
+ metrics.setSourceProcessId(entity.getSourceProcessId());
+ metrics.setDestProcessId(entity.getDestProcessId());
+ metrics.setEntityId(entity.getEntityId());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationServerSideMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationServerSideMetrics.java
new file mode 100644
index 0000000000..5dd9c880eb
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/process/ProcessRelationServerSideMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.process;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+@Stream(name = ProcessRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.PROCESS_RELATION,
+ builder = ProcessRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
+@MetricsExtension(supportDownSampling = false, supportUpdate = true, timeRelativeID = true)
+@EqualsAndHashCode(of = {
+ "entityId"
+}, callSuper = true)
+public class ProcessRelationServerSideMetrics extends Metrics {
+
+ public static final String INDEX_NAME = "process_relation_server_side";
+ public static final String SERVICE_INSTANCE_ID = "service_instance_id";
+ public static final String SOURCE_PROCESS_ID = "source_process_id";
+ public static final String DEST_PROCESS_ID = "dest_process_id";
+
+ @Setter
+ @Getter
+ @Column(columnName = SERVICE_INSTANCE_ID)
+ private String serviceInstanceId;
+ @Setter
+ @Getter
+ @Column(columnName = SOURCE_PROCESS_ID)
+ private String sourceProcessId;
+ @Setter
+ @Getter
+ @Column(columnName = DEST_PROCESS_ID)
+ private String destProcessId;
+ @Setter
+ @Getter
+ @Column(columnName = ENTITY_ID, length = 512)
+ private String entityId;
+
+ @Override
+ protected String id0() {
+ return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+ }
+
+ @Override
+ public boolean combine(Metrics metrics) {
+ if (this.getTimeBucket() > metrics.getTimeBucket()) {
+ this.setTimeBucket(metrics.getTimeBucket());
+ }
+ return true;
+ }
+
+ @Override
+ public void calculate() {
+ }
+
+ @Override
+ public Metrics toHour() {
+ return null;
+ }
+
+ @Override
+ public Metrics toDay() {
+ return null;
+ }
+
+ @Override
+ public void deserialize(RemoteData remoteData) {
+ setServiceInstanceId(remoteData.getDataStrings(0));
+ setSourceProcessId(remoteData.getDataStrings(1));
+ setDestProcessId(remoteData.getDataStrings(2));
+ setEntityId(remoteData.getDataStrings(3));
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ final RemoteData.Builder builder = RemoteData.newBuilder();
+ builder.addDataStrings(getServiceInstanceId());
+ builder.addDataStrings(getSourceProcessId());
+ builder.addDataStrings(getDestProcessId());
+ builder.addDataStrings(getEntityId());
+ return builder;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return this.entityId.hashCode();
+ }
+
+ public static class Builder implements StorageBuilder<ProcessRelationServerSideMetrics> {
+ @Override
+ public ProcessRelationServerSideMetrics storage2Entity(final Convert2Entity converter) {
+ ProcessRelationServerSideMetrics metrics = new ProcessRelationServerSideMetrics();
+ metrics.setServiceInstanceId((String) converter.get(SERVICE_INSTANCE_ID));
+ metrics.setSourceProcessId((String) converter.get(SOURCE_PROCESS_ID));
+ metrics.setDestProcessId((String) converter.get(DEST_PROCESS_ID));
+ metrics.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ metrics.setEntityId((String) converter.get(ENTITY_ID));
+ return metrics;
+ }
+
+ @Override
+ public void entity2Storage(final ProcessRelationServerSideMetrics storageData,
+ final Convert2Storage converter) {
+ converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ converter.accept(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+ converter.accept(SOURCE_PROCESS_ID, storageData.getSourceProcessId());
+ converter.accept(DEST_PROCESS_ID, storageData.getDestProcessId());
+ converter.accept(ENTITY_ID, storageData.getEntityId());
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
index 2c8b8a15cc..eb40eb1593 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
@@ -44,6 +44,8 @@ public class MeterEntity {
private String endpointName;
private String sourceServiceName;
private String destServiceName;
+ private String sourceProcessId;
+ private String destProcessId;
private DetectPoint detectPoint;
private Layer layer;
@@ -66,6 +68,11 @@ public class MeterEntity {
sourceServiceId(),
destServiceId()
));
+ case PROCESS_RELATION:
+ return IDManager.ProcessID.buildRelationId(new IDManager.ProcessID.ProcessRelationDefine(
+ sourceProcessId,
+ destProcessId
+ ));
default:
throw new UnexpectedException("Unexpected scope type of entity " + this.toString());
}
@@ -75,6 +82,10 @@ public class MeterEntity {
return IDManager.ServiceID.buildId(serviceName, true);
}
+ public String serviceInstanceId() {
+ return IDManager.ServiceInstanceID.buildId(serviceId(), instanceName);
+ }
+
public String sourceServiceId() {
return IDManager.ServiceID.buildId(sourceServiceName, true);
}
@@ -138,4 +149,16 @@ public class MeterEntity {
meterEntity.layer = layer;
return meterEntity;
}
+
+ public static MeterEntity newProcessRelation(String serviceName, String instanceName,
+ String sourceProcessId, String destProcessId, DetectPoint detectPoint) {
+ final MeterEntity meterEntity = new MeterEntity();
+ meterEntity.scopeType = ScopeType.PROCESS_RELATION;
+ meterEntity.serviceName = serviceName;
+ meterEntity.instanceName = instanceName;
+ meterEntity.sourceProcessId = sourceProcessId;
+ meterEntity.destProcessId = destProcessId;
+ meterEntity.detectPoint = detectPoint;
+ return meterEntity;
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
index 3d0a4a26bd..4f89cc77c2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/ScopeType.java
@@ -25,7 +25,8 @@ public enum ScopeType {
SERVICE(DefaultScopeDefine.SERVICE),
SERVICE_INSTANCE(DefaultScopeDefine.SERVICE_INSTANCE),
ENDPOINT(DefaultScopeDefine.ENDPOINT),
- SERVICE_RELATION(DefaultScopeDefine.SERVICE_RELATION);
+ SERVICE_RELATION(DefaultScopeDefine.SERVICE_RELATION),
+ PROCESS_RELATION(DefaultScopeDefine.PROCESS_RELATION);
@Getter
private final int scopeId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 4c99e6902b..7e0eec94d7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -105,6 +105,7 @@ public class DefaultScopeDefine {
public static final int ZIPKIN_SERVICE = 51;
public static final int ZIPKIN_SERVICE_SPAN = 52;
public static final int ZIPKIN_SERVICE_RELATION = 53;
+ public static final int PROCESS_RELATION = 54;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
@@ -116,6 +117,7 @@ public class DefaultScopeDefine {
public static final String SERVICE_INSTANCE_RELATION_CATALOG_NAME = "SERVICE_INSTANCE_RELATION";
public static final String ENDPOINT_RELATION_CATALOG_NAME = "ENDPOINT_RELATION";
public static final String PROCESS_CATALOG_NAME = "PROCESS";
+ public static final String PROCESS_RELATION_CATALOG_NAME = "PROCESS_RELATION";
private static final Map<Integer, Boolean> SERVICE_CATALOG = new HashMap<>();
private static final Map<Integer, Boolean> SERVICE_INSTANCE_CATALOG = new HashMap<>();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ProcessRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ProcessRelation.java
new file mode 100644
index 0000000000..191858bd97
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ProcessRelation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS_RELATION;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS_RELATION_CATALOG_NAME;
+
+@ScopeDeclaration(id = PROCESS_RELATION, name = "ProcessRelation", catalog = PROCESS_RELATION_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class ProcessRelation extends Source {
+ private String entityId;
+
+ @Override
+ public int scope() {
+ return PROCESS_RELATION;
+ }
+
+ @Override
+ public String getEntityId() {
+ if (StringUtils.isEmpty(entityId)) {
+ entityId = IDManager.ProcessID.buildRelationId(
+ new IDManager.ProcessID.ProcessRelationDefine(
+ sourceProcessId,
+ destProcessId
+ )
+ );
+ }
+ return entityId;
+ }
+
+ @Getter
+ @Setter
+ private String instanceId;
+ @Getter
+ @Setter
+ private String sourceProcessId;
+ @Getter
+ @Setter
+ private String destProcessId;
+ @Getter
+ @Setter
+ private DetectPoint detectPoint;
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
index b167c5f730..c419d38d15 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
@@ -28,7 +28,9 @@ public class ZabbixConfig implements MetricRuleConfig {
private String metricPrefix;
private String expSuffix;
+ private String expPrefix;
private String filter;
+ private String initExp;
private Entities entities;
private List<String> requiredZabbixItemKeys;
private List<Metric> metrics;
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml
index 8310351653..a0d98f67ab 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-cluster.yaml
@@ -28,6 +28,7 @@
# "-P6H3M" -- parses as "-6 hours and -3 minutes"
# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
# </pre>
+initExp: Kubernetes.startMetadataListener()
filter: "{ tags -> tags.job_name in [ 'kubernetes-cadvisor', 'kube-state-metrics' ] }" # The OpenTelemetry job name
expSuffix: tag({tags -> tags.cluster = 'k8s-cluster::' + tags.cluster}).service(['cluster'], Layer.K8S)
metricPrefix: k8s_cluster
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml
index 8d723fbd4b..28ff4696a9 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-instance.yaml
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+initExp: Kubernetes.startMetadataListener()
filter: "{ tags -> tags.job_name in [ 'kubernetes-cadvisor', 'kube-state-metrics' ] }" # The OpenTelemetry job name
expSuffix: |-
service(['cluster' , 'service'], '::', Layer.K8S_SERVICE)
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml
index 0bdcd5e400..9a72f81f74 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/k8s-service.yaml
@@ -28,6 +28,7 @@
# "-P6H3M" -- parses as "-6 hours and -3 minutes"
# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
# </pre>
+initExp: Kubernetes.startMetadataListener()
filter: "{ tags -> tags.job_name in [ 'kubernetes-cadvisor', 'kube-state-metrics' ] }" # The OpenTelemetry job name
expSuffix: service(['cluster' , 'service'], '::', Layer.K8S_SERVICE)
metricPrefix: k8s_service