You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/11/19 11:45:33 UTC
[skywalking] 01/01: Add filter mechanism in MAL core and fix some bugs
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch mal/enhance
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 15f7e0039d0b0846622c121f1bd5b24dd8a1b2fc
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Nov 19 19:45:13 2021 +0800
Add filter mechanism in MAL core and fix some bugs
Also fixed some minor bugs
* Fix concurrency bug in MAL `increase`-related calculation.
* Fix a null pointer bug when building `SampleFamily`.
Closes https://github.com/apache/skywalking/issues/8145
---
CHANGES.md | 3 +
docs/en/setup/backend/backend-telemetry.md | 2 +-
.../provider/meter/config/MeterConfig.java | 1 +
.../provider/meter/process/MeterProcessor.java | 2 +-
.../skywalking/oap/meter/analyzer/Analyzer.java | 26 +++-
.../oap/meter/analyzer/MetricConvert.java | 1 +
.../oap/meter/analyzer/MetricRuleConfig.java | 2 +
.../oap/meter/analyzer/dsl/Expression.java | 9 +-
.../oap/meter/analyzer/dsl/FilterExpression.java | 54 ++++++++
.../oap/meter/analyzer/dsl/SampleFamily.java | 138 +++++++++++++++------
.../meter/analyzer/dsl/counter/CounterWindow.java | 15 +--
.../oap/meter/analyzer/prometheus/rule/Rule.java | 1 +
.../oap/meter/analyzer/dsl/AnalyzerTest.java | 3 +
.../oap/meter/analyzer/dsl/FilterTest.java | 95 ++++++++++++++
.../server/receiver/otel/oc/OCMetricHandler.java | 5 +
.../zabbix/provider/config/ZabbixConfig.java | 1 +
.../src/main/resources/otel-oc-rules/oap.yaml | 1 +
.../src/main/resources/otel-oc-rules/vm.yaml | 1 +
.../otel-collector-config.yaml | 2 +-
19 files changed, 298 insertions(+), 64 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3d6738e..304a2ab 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -50,6 +50,9 @@ Release Notes.
* Add `MicroBench` module to make it easier for developers to write JMH test.
* Upgrade Kubernetes Java client to 14.0.0, supports GCP token refreshing and fixes some bugs.
* Change `SO11Y` metric `envoy_als_in_count` to calculate the ALS message count.
+* Add filter mechanism in MAL core to filter metrics.
+* Fix concurrency bug in MAL `increase`-related calculation.
+* Fix a null pointer bug when building `SampleFamily`.
#### UI
diff --git a/docs/en/setup/backend/backend-telemetry.md b/docs/en/setup/backend/backend-telemetry.md
index 5c7f8e9..e0a76d6 100644
--- a/docs/en/setup/backend/backend-telemetry.md
+++ b/docs/en/setup/backend/backend-telemetry.md
@@ -128,7 +128,7 @@ Set this up following these steps:
```
2. Set up OpenTelemetry Collector and config a scrape job:
``` yaml
-- job_name: 'skywalking'
+- job_name: 'skywalking-so11y' # make sure to use this in the so11y.yaml to filter only so11y metrics
metrics_path: '/metrics'
kubernetes_sd_configs:
- role: pod
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
index 121af54..c2e8930 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
@@ -29,6 +29,7 @@ import java.util.List;
public class MeterConfig implements MetricRuleConfig {
private String metricPrefix;
private String expSuffix;
+ private String filter;
private List<Rule> metricsRules;
@Data
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
index 14a41e1..698075d 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
@@ -133,7 +133,7 @@ public class MeterProcessor {
}
try {
- converts.stream().forEach(convert -> convert.toMeter(meters.entrySet().stream().collect(toImmutableMap(
+ converts.forEach(convert -> convert.toMeter(meters.entrySet().stream().collect(toImmutableMap(
Map.Entry::getKey,
v -> SampleFamilyBuilder.newBuilder(
v.getValue().stream().map(s -> s.build(service, serviceInstance, timestamp)).toArray(Sample[]::new)
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 2770f88..58c9e53 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.meter.analyzer;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.vavr.Tuple;
import io.vavr.Tuple2;
@@ -36,6 +37,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
import org.apache.skywalking.oap.meter.analyzer.dsl.DownsamplingType;
import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
import org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingContext;
+import org.apache.skywalking.oap.meter.analyzer.dsl.FilterExpression;
import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
@@ -56,6 +58,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileA
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
@@ -74,11 +77,17 @@ public class Analyzer {
public static final Tuple2<String, SampleFamily> NIL = Tuple.of("", null);
- public static Analyzer build(final String metricName, final String expression,
+ public static Analyzer build(final String metricName,
+ final String filterExpression,
+ final String expression,
final MeterSystem meterSystem) {
Expression e = DSL.parse(expression);
+ FilterExpression filter = null;
+ if (!Strings.isNullOrEmpty(filterExpression)) {
+ filter = new FilterExpression(filterExpression);
+ }
ExpressionParsingContext ctx = e.parse();
- Analyzer analyzer = new Analyzer(metricName, e, meterSystem);
+ Analyzer analyzer = new Analyzer(metricName, filter, e, meterSystem);
analyzer.init(ctx);
return analyzer;
}
@@ -89,6 +98,8 @@ public class Analyzer {
private final String metricName;
+ private final FilterExpression filterExpression;
+
private final Expression expression;
private final MeterSystem meterSystem;
@@ -103,16 +114,19 @@ public class Analyzer {
* @param sampleFamilies input samples.
*/
public void analyse(final ImmutableMap<String, SampleFamily> sampleFamilies) {
- ImmutableMap<String, SampleFamily> input = samples.stream()
- .map(s -> Tuple.of(s, sampleFamilies.get(s)))
- .filter(t -> t._2 != null)
- .collect(ImmutableMap.toImmutableMap(t -> t._1, t -> t._2));
+ Map<String, SampleFamily> input = samples.stream()
+ .map(s -> Tuple.of(s, sampleFamilies.get(s)))
+ .filter(t -> t._2 != null)
+ .collect(toImmutableMap(t -> t._1, t -> t._2));
if (input.size() < 1) {
if (log.isDebugEnabled()) {
log.debug("{} is ignored due to the lack of {}", expression, samples);
}
return;
}
+ if (filterExpression != null) {
+ input = filterExpression.filter(input);
+ }
Result r = expression.run(input);
if (!r.isSuccess()) {
return;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
index c7bec66..07ff140 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
@@ -51,6 +51,7 @@ public class MetricConvert {
this.analyzers = rule.getMetricsRules().stream().map(
r -> Analyzer.build(
formatMetricName(rule, r.getName()),
+ rule.getFilter(),
Strings.isNullOrEmpty(rule.getExpSuffix()) ?
r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
service
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
index 64f96cc..831f4c8 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
@@ -40,6 +40,8 @@ public interface MetricRuleConfig {
*/
List<? extends RuleConfig> getMetricsRules();
+ String getFilter();
+
interface RuleConfig {
/**
* Get definition metrics name
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
index e9a0f6e..dce4af3 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
@@ -23,6 +23,7 @@ import groovy.lang.ExpandoMetaClass;
import groovy.lang.GroovyObjectSupport;
import groovy.util.DelegatingScript;
import java.time.Instant;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@@ -38,7 +39,7 @@ public class Expression {
private final DelegatingScript expression;
- private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
+ private final ThreadLocal<Map<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
public Expression(final String literal, final DelegatingScript expression) {
this.literal = literal;
@@ -71,7 +72,7 @@ public class Expression {
* @param sampleFamilies a data map includes all of candidates to be analysis.
* @return The result of execution.
*/
- public Result run(final ImmutableMap<String, SampleFamily> sampleFamilies) {
+ public Result run(final Map<String, SampleFamily> sampleFamilies) {
propertyRepository.set(sampleFamilies);
try {
SampleFamily sf = (SampleFamily) expression.run();
@@ -114,7 +115,7 @@ public class Expression {
public static final DownsamplingType LATEST = DownsamplingType.LATEST;
private final String literal;
- private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository;
+ private final ThreadLocal<Map<String, SampleFamily>> propertyRepository;
public SampleFamily propertyMissing(String metricName) {
ExpressionParsingContext.get().ifPresent(ctx -> {
@@ -122,7 +123,7 @@ public class Expression {
ctx.samples.add(metricName);
}
});
- ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
+ Map<String, SampleFamily> sampleFamilies = propertyRepository.get();
if (sampleFamilies == null) {
return SampleFamily.EMPTY;
}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java
new file mode 100644
index 0000000..69ffe9b
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl;
+
+import groovy.lang.Closure;
+import groovy.lang.GroovyShell;
+import java.util.Map;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import static java.util.stream.Collectors.toMap;
+
+@Slf4j
+@ToString(of = {"literal"})
+public class FilterExpression {
+ private final String literal;
+ private final Closure<Boolean> filterClosure;
+
+ @SuppressWarnings("unchecked")
+ public FilterExpression(final String literal) {
+ this.literal = literal;
+
+ GroovyShell sh = new GroovyShell();
+ filterClosure = (Closure<Boolean>) sh.evaluate(literal);
+ }
+
+ public Map<String, SampleFamily> filter(final Map<String, SampleFamily> sampleFamilies) {
+ try {
+ return sampleFamilies.entrySet().stream().collect(toMap(
+ Map.Entry::getKey,
+ it -> it.getValue().filter(filterClosure)
+ ));
+ } catch (Throwable t) {
+ log.error("failed to run \"{}\"", literal, t);
+ }
+ return sampleFamilies;
+ }
+}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index c153033..6be2740 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -18,22 +18,13 @@
package org.apache.skywalking.oap.meter.analyzer.dsl;
-import com.google.common.base.CharMatcher;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.AtomicDouble;
-import groovy.lang.Closure;
-import io.vavr.Function2;
-import io.vavr.Function3;
-import lombok.AccessLevel;
-import lombok.Builder;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
+import static java.util.function.UnaryOperator.identity;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EndpointEntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.InstanceEntityDescription;
@@ -43,6 +34,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
+import org.apache.skywalking.oap.server.core.source.DetectPoint;
import java.util.Arrays;
import java.util.Comparator;
@@ -57,13 +49,25 @@ import java.util.function.DoubleBinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import static com.google.common.collect.ImmutableMap.toImmutableMap;
-import static java.util.function.UnaryOperator.identity;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toList;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AtomicDouble;
+
+import groovy.lang.Closure;
+import io.vavr.Function2;
+import io.vavr.Function3;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
/**
* SampleFamily represents a collection of {@link Sample}.
@@ -71,13 +75,17 @@ import static java.util.stream.Collectors.toList;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@EqualsAndHashCode
@ToString
+@Slf4j
public class SampleFamily {
public static final SampleFamily EMPTY = new SampleFamily(new Sample[0], RunningContext.EMPTY);
static SampleFamily build(RunningContext ctx, Sample... samples) {
Preconditions.checkNotNull(samples);
- samples = Arrays.stream(samples).filter(sample -> !Double.isNaN(sample.getValue())).toArray(Sample[]::new);
Preconditions.checkArgument(samples.length > 0);
+ samples = Arrays.stream(samples).filter(sample -> !Double.isNaN(sample.getValue())).toArray(Sample[]::new);
+ if (samples.length == 0) {
+ return EMPTY;
+ }
return new SampleFamily(samples, Optional.ofNullable(ctx).orElseGet(RunningContext::instance));
}
@@ -332,6 +340,19 @@ public class SampleFamily {
);
}
+ public SampleFamily filter(Closure<Boolean> filter) {
+ if (this == EMPTY) {
+ return EMPTY;
+ }
+ final Sample[] filtered = Arrays.stream(samples)
+ .filter(it -> filter.call(it.labels))
+ .toArray(Sample[]::new);
+ if (filtered.length == 0) {
+ return EMPTY;
+ }
+ return SampleFamily.build(context, filtered);
+ }
+
/* k8s retags*/
public SampleFamily retagByK8sMeta(String newLabelName,
K8sRetagType type,
@@ -467,8 +488,9 @@ public class SampleFamily {
mapping(identity(), toList())
))
.forEach((labels, samples) -> {
- MeterEntity meterEntity = InternalOps.buildMeterEntity(samples, entityDescription);
- meterSamples.put(meterEntity, InternalOps.left(samples, entityDescription.getLabelKeys()));
+ Optional<MeterEntity> meterEntity = InternalOps.buildMeterEntity(samples, entityDescription);
+ meterEntity.ifPresent(it -> meterSamples.put(
+ it, InternalOps.left(samples, entityDescription.getLabelKeys())));
});
this.context.setMeterSamples(meterSamples);
@@ -577,34 +599,68 @@ public class SampleFamily {
return CharMatcher.is('.').trimFrom(name);
}
- private static MeterEntity buildMeterEntity(List<Sample> samples,
- EntityDescription entityDescription) {
+ private static Optional<MeterEntity> buildMeterEntity(List<Sample> samples,
+ EntityDescription entityDescription) {
+ final String serviceName;
+ final String serviceInstanceName;
+ final String endpointName;
switch (entityDescription.getScopeType()) {
case SERVICE:
ServiceEntityDescription serviceEntityDescription = (ServiceEntityDescription) entityDescription;
- return MeterEntity.newService(InternalOps.dim(samples, serviceEntityDescription.getServiceKeys()));
+ serviceName = InternalOps.dim(samples, serviceEntityDescription.getServiceKeys());
+ if (Strings.isNullOrEmpty(serviceName)) {
+ log.warn(
+ "service built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+ entityDescription
+ );
+ return Optional.empty();
+ }
+ return Optional.of(MeterEntity.newService(serviceName));
case SERVICE_INSTANCE:
InstanceEntityDescription instanceEntityDescription = (InstanceEntityDescription) entityDescription;
- return MeterEntity.newServiceInstance(
- InternalOps.dim(samples, instanceEntityDescription.getServiceKeys()),
- InternalOps.dim(samples, instanceEntityDescription.getInstanceKeys())
- );
+ serviceName = InternalOps.dim(samples, instanceEntityDescription.getServiceKeys());
+ serviceInstanceName = InternalOps.dim(samples, instanceEntityDescription.getInstanceKeys());
+ if (Strings.isNullOrEmpty(serviceName) ||
+ Strings.isNullOrEmpty(serviceInstanceName)) {
+ log.warn(
+ "service([{}]) or instance([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+ serviceName, serviceInstanceName, entityDescription
+ );
+ return Optional.empty();
+ }
+ return Optional.of(MeterEntity.newServiceInstance(serviceName, serviceInstanceName));
case ENDPOINT:
EndpointEntityDescription endpointEntityDescription = (EndpointEntityDescription) entityDescription;
- return MeterEntity.newEndpoint(
- InternalOps.dim(samples, endpointEntityDescription.getServiceKeys()),
- InternalOps.dim(samples, endpointEntityDescription.getEndpointKeys())
- );
+ serviceName = InternalOps.dim(samples, endpointEntityDescription.getServiceKeys());
+ endpointName = InternalOps.dim(samples, endpointEntityDescription.getEndpointKeys());
+ if (Strings.isNullOrEmpty(serviceName) ||
+ Strings.isNullOrEmpty(endpointName)) {
+ log.warn(
+ "service([{}]) or endpoint([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+ serviceName, endpointName, entityDescription
+ );
+ return Optional.empty();
+ }
+ return Optional.of(MeterEntity.newEndpoint(serviceName, endpointName));
case SERVICE_RELATION:
ServiceRelationEntityDescription serviceRelationEntityDescription = (ServiceRelationEntityDescription) entityDescription;
- return MeterEntity.newServiceRelation(
- InternalOps.dim(samples, serviceRelationEntityDescription.getSourceServiceKeys()),
- InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys()),
+ String sourceServiceName = InternalOps.dim(samples, serviceRelationEntityDescription.getSourceServiceKeys());
+ String destServiceName = InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys());
+ if (Strings.isNullOrEmpty(sourceServiceName) ||
+ Strings.isNullOrEmpty(destServiceName)) {
+ log.warn(
+ "source service([{}]) or dest service([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+ sourceServiceName, destServiceName, entityDescription
+ );
+ return Optional.empty();
+ }
+ return Optional.of(MeterEntity.newServiceRelation(
+ sourceServiceName, destServiceName,
serviceRelationEntityDescription.getDetectPoint()
- );
+ ));
default:
throw new UnexpectedException(
- "Unexpected scope type of entityDescription " + entityDescription.toString());
+ "Unexpected scope type of entityDescription " + entityDescription);
}
}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
index ecdc329..b1e4eac 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
@@ -19,12 +19,12 @@
package org.apache.skywalking.oap.meter.analyzer.dsl.counter;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
@@ -42,16 +42,12 @@ public class CounterWindow {
public static final CounterWindow INSTANCE = new CounterWindow();
- private final Map<ID, Tuple2<Long, Double>> lastElementMap = Maps.newHashMap();
- private final Map<ID, Queue<Tuple2<Long, Double>>> windows = Maps.newHashMap();
+ private final Map<ID, Tuple2<Long, Double>> lastElementMap = new ConcurrentHashMap<>();
+ private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new ConcurrentHashMap<>();
public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
ID id = new ID(name, labels);
- if (!windows.containsKey(id)) {
- windows.put(id, new PriorityQueue<>());
- }
-
- Queue<Tuple2<Long, Double>> window = windows.get(id);
+ Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>());
window.offer(Tuple.of(now, value));
long waterLevel = now - windowSize;
Tuple2<Long, Double> peek = window.peek();
@@ -77,8 +73,7 @@ public class CounterWindow {
ID id = new ID(name, labels);
Tuple2<Long, Double> element = Tuple.of(now, value);
- Tuple2<Long, Double> result = lastElementMap.get(id);
- lastElementMap.put(id, element);
+ Tuple2<Long, Double> result = lastElementMap.put(id, element);
if (result == null) {
return element;
}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
index 4cb24a8..ea4e78a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
@@ -37,5 +37,6 @@ public class Rule implements MetricRuleConfig {
private StaticConfig staticConfig;
private String metricPrefix;
private String expSuffix;
+ private String filter;
private List<MetricsRule> metricsRules;
}
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
index e5e1813..4223548 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
@@ -87,6 +87,7 @@ public class AnalyzerTest {
public void testSingle() {
analyzer = Analyzer.build(
"sum_service_instance",
+ null,
"http_success_request.sum(['region', 'idc']).instance(['idc'] , ['region'])",
meterSystem
);
@@ -130,6 +131,7 @@ public class AnalyzerTest {
public void testLabeled() {
analyzer = Analyzer.build(
"sum_service_instance_labels",
+ null,
"http_success_request.sum(['region', 'idc' , 'instance']).instance(['idc'] , ['region'])",
meterSystem
);
@@ -178,6 +180,7 @@ public class AnalyzerTest {
public void testHistogramPercentile() {
analyzer = Analyzer.build(
"instance_cpu_percentage",
+ null,
"instance_cpu_percentage.sum(['le' , 'service' , 'instance']).histogram().histogram_percentile([75,99]).service(['service'])",
meterSystem
);
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java
new file mode 100644
index 0000000..d8ebdd5
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import static com.google.common.collect.ImmutableMap.of;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@RunWith(Parameterized.class)
+public class FilterTest {
+ @Parameterized.Parameter
+ public String name;
+
+ @Parameterized.Parameter(1)
+ public ImmutableMap<String, SampleFamily> input;
+
+ @Parameterized.Parameter(2)
+ public String expression;
+
+ @Parameterized.Parameter(3)
+ public Result want;
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Collection<Object[]> data() {
+ final SampleFamily sf =
+ SampleFamilyBuilder.newBuilder(
+ Sample.builder()
+ .value(1600592418480.0)
+ .labels(ImmutableMap.of("str", "val1"))
+ .name("instance_cpu_percentage")
+ .build(),
+ Sample.builder()
+ .value(1600592418480.0)
+ .labels(ImmutableMap.of("str", "val2"))
+ .name("instance_cpu_percentage")
+ .build())
+ .build();
+ return Arrays.asList(new Object[][]{
+ {
+ "filter-string",
+ of("instance_cpu_percentage", sf),
+ "instance_cpu_percentage.filter({ tags -> tags.str == 'val1' })",
+ Result.success(SampleFamily.build(sf.context, sf.samples[0]))
+ },
+ {
+ "filter-none",
+ of("instance_cpu_percentage", sf),
+ "instance_cpu_percentage.filter({ tags -> tags.str == 'val2' })",
+ Result.success(SampleFamily.build(sf.context, sf.samples[1]))
+ },
+ {
+ "filter-not-equal",
+ of("instance_cpu_percentage", sf),
+ "instance_cpu_percentage.filter({ tags -> tags.str != 'val1' })",
+ Result.success(SampleFamily.build(sf.context, sf.samples[1]))
+ },
+ });
+ }
+
+ @Test
+ public void test() {
+ Expression e = DSL.parse(expression);
+ Result r = e.run(input);
+ assertThat(r, is(want));
+ }
+}
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
index ab9af94..1fdf28a 100644
--- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.receiver.otel.oc;
+import com.google.common.base.Strings;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.opencensus.proto.agent.common.v1.Node;
@@ -75,6 +76,10 @@ public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase i
nodeLabels.put("node_identifier_pid", String.valueOf(node.getIdentifier().getPid()));
}
}
+ final String name = node.getServiceInfo().getName();
+ if (!Strings.isNullOrEmpty(name)) {
+ nodeLabels.put("job_name", name);
+ }
}
metrics.forEach(m -> m.toMeter(request.getMetricsList().stream()
.flatMap(metric -> metric.getTimeseriesList().stream().map(timeSeries ->
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
index eebcf46..b167c5f 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
@@ -28,6 +28,7 @@ public class ZabbixConfig implements MetricRuleConfig {
private String metricPrefix;
private String expSuffix;
+ private String filter;
private Entities entities;
private List<String> requiredZabbixItemKeys;
private List<Metric> metrics;
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
index f6468a8..752a6bd 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
@@ -28,6 +28,7 @@
# "-P6H3M" -- parses as "-6 hours and -3 minutes"
# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
# </pre>
+filter: "{ tags -> tags.job_name == 'skywalking-so11y' }"
expSuffix: tag({tags -> tags.service = 'oap::' + tags.service}).instance(['service'], ['host_name'])
metricPrefix: meter_oap
metricsRules:
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
index a2d0437..b030150 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
@@ -28,6 +28,7 @@
# "-P6H3M" -- parses as "-6 hours and -3 minutes"
# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
# </pre>
+filter: "{ tags -> tags.job_name == 'vm-monitoring' }"
expSuffix: tag({tags -> tags.node_identifier_host_name = 'vm::' + tags.node_identifier_host_name}).service(['node_identifier_host_name'])
metricPrefix: meter_vm
metricsRules:
diff --git a/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml b/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
index a0b06bc..770232e 100644
--- a/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
+++ b/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
@@ -17,7 +17,7 @@ receivers:
prometheus:
config:
scrape_configs:
- - job_name: 'otel-collector'
+ - job_name: 'vm-monitoring' # make sure to use this in the vm.yaml to filter only VM metrics
scrape_interval: 10s
static_configs:
- targets: [ 'vm-service:9100' ]