You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/11/19 11:45:33 UTC

[skywalking] 01/01: Add filter mechanism in MAL core and fix some bugs

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch mal/enhance
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 15f7e0039d0b0846622c121f1bd5b24dd8a1b2fc
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Nov 19 19:45:13 2021 +0800

    Add filter mechanism in MAL core and fix some bugs
    
    Also fixed some minor bugs
    
    * Fix concurrency bug in MAL `increase`-related calculation.
    * Fix a null pointer bug when building `SampleFamily`.
    
    Closes https://github.com/apache/skywalking/issues/8145
---
 CHANGES.md                                         |   3 +
 docs/en/setup/backend/backend-telemetry.md         |   2 +-
 .../provider/meter/config/MeterConfig.java         |   1 +
 .../provider/meter/process/MeterProcessor.java     |   2 +-
 .../skywalking/oap/meter/analyzer/Analyzer.java    |  26 +++-
 .../oap/meter/analyzer/MetricConvert.java          |   1 +
 .../oap/meter/analyzer/MetricRuleConfig.java       |   2 +
 .../oap/meter/analyzer/dsl/Expression.java         |   9 +-
 .../oap/meter/analyzer/dsl/FilterExpression.java   |  54 ++++++++
 .../oap/meter/analyzer/dsl/SampleFamily.java       | 138 +++++++++++++++------
 .../meter/analyzer/dsl/counter/CounterWindow.java  |  15 +--
 .../oap/meter/analyzer/prometheus/rule/Rule.java   |   1 +
 .../oap/meter/analyzer/dsl/AnalyzerTest.java       |   3 +
 .../oap/meter/analyzer/dsl/FilterTest.java         |  95 ++++++++++++++
 .../server/receiver/otel/oc/OCMetricHandler.java   |   5 +
 .../zabbix/provider/config/ZabbixConfig.java       |   1 +
 .../src/main/resources/otel-oc-rules/oap.yaml      |   1 +
 .../src/main/resources/otel-oc-rules/vm.yaml       |   1 +
 .../otel-collector-config.yaml                     |   2 +-
 19 files changed, 298 insertions(+), 64 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3d6738e..304a2ab 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -50,6 +50,9 @@ Release Notes.
 * Add `MicroBench` module to make it easier for developers to write JMH test.
 * Upgrade Kubernetes Java client to 14.0.0, supports GCP token refreshing and fixes some bugs.
 * Change `SO11Y` metric `envoy_als_in_count` to calculate the ALS message count.
+* Add filter mechanism in MAL core to filter metrics.
+* Fix concurrency bug in MAL `increase`-related calculation.
+* Fix a null pointer bug when building `SampleFamily`.
 
 #### UI
 
diff --git a/docs/en/setup/backend/backend-telemetry.md b/docs/en/setup/backend/backend-telemetry.md
index 5c7f8e9..e0a76d6 100644
--- a/docs/en/setup/backend/backend-telemetry.md
+++ b/docs/en/setup/backend/backend-telemetry.md
@@ -128,7 +128,7 @@ Set this up following these steps:
   ```
 2. Set up OpenTelemetry Collector and config a scrape job:
 ``` yaml
-- job_name: 'skywalking'
+- job_name: 'skywalking-so11y' # make sure to use this in the so11y.yaml to filter only so11y metrics
   metrics_path: '/metrics'
   kubernetes_sd_configs:
   - role: pod
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
index 121af54..c2e8930 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
@@ -29,6 +29,7 @@ import java.util.List;
 public class MeterConfig implements MetricRuleConfig {
     private String metricPrefix;
     private String expSuffix;
+    private String filter;
     private List<Rule> metricsRules;
 
     @Data
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
index 14a41e1..698075d 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
@@ -133,7 +133,7 @@ public class MeterProcessor {
         }
 
         try {
-            converts.stream().forEach(convert -> convert.toMeter(meters.entrySet().stream().collect(toImmutableMap(
+            converts.forEach(convert -> convert.toMeter(meters.entrySet().stream().collect(toImmutableMap(
                 Map.Entry::getKey,
                 v -> SampleFamilyBuilder.newBuilder(
                     v.getValue().stream().map(s -> s.build(service, serviceInstance, timestamp)).toArray(Sample[]::new)
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 2770f88..58c9e53 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.meter.analyzer;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
@@ -36,6 +37,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
 import org.apache.skywalking.oap.meter.analyzer.dsl.DownsamplingType;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
 import org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingContext;
+import org.apache.skywalking.oap.meter.analyzer.dsl.FilterExpression;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
@@ -56,6 +58,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileA
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.mapping;
@@ -74,11 +77,17 @@ public class Analyzer {
 
     public static final Tuple2<String, SampleFamily> NIL = Tuple.of("", null);
 
-    public static Analyzer build(final String metricName, final String expression,
+    public static Analyzer build(final String metricName,
+                                 final String filterExpression,
+                                 final String expression,
                                  final MeterSystem meterSystem) {
         Expression e = DSL.parse(expression);
+        FilterExpression filter = null;
+        if (!Strings.isNullOrEmpty(filterExpression)) {
+            filter = new FilterExpression(filterExpression);
+        }
         ExpressionParsingContext ctx = e.parse();
-        Analyzer analyzer = new Analyzer(metricName, e, meterSystem);
+        Analyzer analyzer = new Analyzer(metricName, filter, e, meterSystem);
         analyzer.init(ctx);
         return analyzer;
     }
@@ -89,6 +98,8 @@ public class Analyzer {
 
     private final String metricName;
 
+    private final FilterExpression filterExpression;
+
     private final Expression expression;
 
     private final MeterSystem meterSystem;
@@ -103,16 +114,19 @@ public class Analyzer {
      * @param sampleFamilies input samples.
      */
     public void analyse(final ImmutableMap<String, SampleFamily> sampleFamilies) {
-        ImmutableMap<String, SampleFamily> input = samples.stream()
-                                                          .map(s -> Tuple.of(s, sampleFamilies.get(s)))
-                                                          .filter(t -> t._2 != null)
-                                                          .collect(ImmutableMap.toImmutableMap(t -> t._1, t -> t._2));
+        Map<String, SampleFamily> input = samples.stream()
+                                                 .map(s -> Tuple.of(s, sampleFamilies.get(s)))
+                                                 .filter(t -> t._2 != null)
+                                                 .collect(toImmutableMap(t -> t._1, t -> t._2));
         if (input.size() < 1) {
             if (log.isDebugEnabled()) {
                 log.debug("{} is ignored due to the lack of {}", expression, samples);
             }
             return;
         }
+        if (filterExpression != null) {
+            input = filterExpression.filter(input);
+        }
         Result r = expression.run(input);
         if (!r.isSuccess()) {
             return;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
index c7bec66..07ff140 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
@@ -51,6 +51,7 @@ public class MetricConvert {
         this.analyzers = rule.getMetricsRules().stream().map(
             r -> Analyzer.build(
                 formatMetricName(rule, r.getName()),
+                rule.getFilter(),
                 Strings.isNullOrEmpty(rule.getExpSuffix()) ?
                     r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
                 service
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
index 64f96cc..831f4c8 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
@@ -40,6 +40,8 @@ public interface MetricRuleConfig {
      */
     List<? extends RuleConfig> getMetricsRules();
 
+    String getFilter();
+
     interface RuleConfig {
         /**
          * Get definition metrics name
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
index e9a0f6e..dce4af3 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
@@ -23,6 +23,7 @@ import groovy.lang.ExpandoMetaClass;
 import groovy.lang.GroovyObjectSupport;
 import groovy.util.DelegatingScript;
 import java.time.Instant;
+import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
@@ -38,7 +39,7 @@ public class Expression {
 
     private final DelegatingScript expression;
 
-    private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
+    private final ThreadLocal<Map<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
 
     public Expression(final String literal, final DelegatingScript expression) {
         this.literal = literal;
@@ -71,7 +72,7 @@ public class Expression {
      * @param sampleFamilies a data map includes all of candidates to be analysis.
      * @return The result of execution.
      */
-    public Result run(final ImmutableMap<String, SampleFamily> sampleFamilies) {
+    public Result run(final Map<String, SampleFamily> sampleFamilies) {
         propertyRepository.set(sampleFamilies);
         try {
             SampleFamily sf = (SampleFamily) expression.run();
@@ -114,7 +115,7 @@ public class Expression {
         public static final DownsamplingType LATEST = DownsamplingType.LATEST;
 
         private final String literal;
-        private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository;
+        private final ThreadLocal<Map<String, SampleFamily>> propertyRepository;
 
         public SampleFamily propertyMissing(String metricName) {
             ExpressionParsingContext.get().ifPresent(ctx -> {
@@ -122,7 +123,7 @@ public class Expression {
                     ctx.samples.add(metricName);
                 }
             });
-            ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
+            Map<String, SampleFamily> sampleFamilies = propertyRepository.get();
             if (sampleFamilies == null) {
                 return SampleFamily.EMPTY;
             }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java
new file mode 100644
index 0000000..69ffe9b
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl;
+
+import groovy.lang.Closure;
+import groovy.lang.GroovyShell;
+import java.util.Map;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import static java.util.stream.Collectors.toMap;
+
+@Slf4j
+@ToString(of = {"literal"})
+public class FilterExpression {
+    private final String literal;
+    private final Closure<Boolean> filterClosure;
+
+    @SuppressWarnings("unchecked")
+    public FilterExpression(final String literal) {
+        this.literal = literal;
+
+        GroovyShell sh = new GroovyShell();
+        filterClosure = (Closure<Boolean>) sh.evaluate(literal);
+    }
+
+    public Map<String, SampleFamily> filter(final Map<String, SampleFamily> sampleFamilies) {
+        try {
+            return sampleFamilies.entrySet().stream().collect(toMap(
+                Map.Entry::getKey,
+                it -> it.getValue().filter(filterClosure)
+            ));
+        } catch (Throwable t) {
+            log.error("failed to run \"{}\"", literal, t);
+        }
+        return sampleFamilies;
+    }
+}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index c153033..6be2740 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -18,22 +18,13 @@
 
 package org.apache.skywalking.oap.meter.analyzer.dsl;
 
-import com.google.common.base.CharMatcher;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.AtomicDouble;
-import groovy.lang.Closure;
-import io.vavr.Function2;
-import io.vavr.Function3;
-import lombok.AccessLevel;
-import lombok.Builder;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
+import static java.util.function.UnaryOperator.identity;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EndpointEntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.InstanceEntityDescription;
@@ -43,6 +34,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
+import org.apache.skywalking.oap.server.core.source.DetectPoint;
 
 import java.util.Arrays;
 import java.util.Comparator;
@@ -57,13 +49,25 @@ import java.util.function.DoubleBinaryOperator;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
 
-import static com.google.common.collect.ImmutableMap.toImmutableMap;
-import static java.util.function.UnaryOperator.identity;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toList;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AtomicDouble;
+
+import groovy.lang.Closure;
+import io.vavr.Function2;
+import io.vavr.Function3;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * SampleFamily represents a collection of {@link Sample}.
@@ -71,13 +75,17 @@ import static java.util.stream.Collectors.toList;
 @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
 @EqualsAndHashCode
 @ToString
+@Slf4j
 public class SampleFamily {
     public static final SampleFamily EMPTY = new SampleFamily(new Sample[0], RunningContext.EMPTY);
 
     static SampleFamily build(RunningContext ctx, Sample... samples) {
         Preconditions.checkNotNull(samples);
-        samples = Arrays.stream(samples).filter(sample -> !Double.isNaN(sample.getValue())).toArray(Sample[]::new);
         Preconditions.checkArgument(samples.length > 0);
+        samples = Arrays.stream(samples).filter(sample -> !Double.isNaN(sample.getValue())).toArray(Sample[]::new);
+        if (samples.length == 0) {
+            return EMPTY;
+        }
         return new SampleFamily(samples, Optional.ofNullable(ctx).orElseGet(RunningContext::instance));
     }
 
@@ -332,6 +340,19 @@ public class SampleFamily {
         );
     }
 
+    public SampleFamily filter(Closure<Boolean> filter) {
+        if (this == EMPTY) {
+            return EMPTY;
+        }
+        final Sample[] filtered = Arrays.stream(samples)
+                                        .filter(it -> filter.call(it.labels))
+                                        .toArray(Sample[]::new);
+        if (filtered.length == 0) {
+            return EMPTY;
+        }
+        return SampleFamily.build(context, filtered);
+    }
+
     /* k8s retags*/
     public SampleFamily retagByK8sMeta(String newLabelName,
                                        K8sRetagType type,
@@ -467,8 +488,9 @@ public class SampleFamily {
                                   mapping(identity(), toList())
               ))
               .forEach((labels, samples) -> {
-                  MeterEntity meterEntity = InternalOps.buildMeterEntity(samples, entityDescription);
-                  meterSamples.put(meterEntity, InternalOps.left(samples, entityDescription.getLabelKeys()));
+                  Optional<MeterEntity> meterEntity = InternalOps.buildMeterEntity(samples, entityDescription);
+                  meterEntity.ifPresent(it -> meterSamples.put(
+                      it, InternalOps.left(samples, entityDescription.getLabelKeys())));
               });
 
         this.context.setMeterSamples(meterSamples);
@@ -577,34 +599,68 @@ public class SampleFamily {
             return CharMatcher.is('.').trimFrom(name);
         }
 
-        private static MeterEntity buildMeterEntity(List<Sample> samples,
-                                                    EntityDescription entityDescription) {
+        private static Optional<MeterEntity> buildMeterEntity(List<Sample> samples,
+                                                              EntityDescription entityDescription) {
+            final String serviceName;
+            final String serviceInstanceName;
+            final String endpointName;
             switch (entityDescription.getScopeType()) {
                 case SERVICE:
                     ServiceEntityDescription serviceEntityDescription = (ServiceEntityDescription) entityDescription;
-                    return MeterEntity.newService(InternalOps.dim(samples, serviceEntityDescription.getServiceKeys()));
+                    serviceName = InternalOps.dim(samples, serviceEntityDescription.getServiceKeys());
+                    if (Strings.isNullOrEmpty(serviceName)) {
+                        log.warn(
+                            "service built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newService(serviceName));
                 case SERVICE_INSTANCE:
                     InstanceEntityDescription instanceEntityDescription = (InstanceEntityDescription) entityDescription;
-                    return MeterEntity.newServiceInstance(
-                        InternalOps.dim(samples, instanceEntityDescription.getServiceKeys()),
-                        InternalOps.dim(samples, instanceEntityDescription.getInstanceKeys())
-                    );
+                    serviceName = InternalOps.dim(samples, instanceEntityDescription.getServiceKeys());
+                    serviceInstanceName = InternalOps.dim(samples, instanceEntityDescription.getInstanceKeys());
+                    if (Strings.isNullOrEmpty(serviceName) ||
+                        Strings.isNullOrEmpty(serviceInstanceName)) {
+                        log.warn(
+                            "service([{}]) or instance([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            serviceName, serviceInstanceName, entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newServiceInstance(serviceName, serviceInstanceName));
                 case ENDPOINT:
                     EndpointEntityDescription endpointEntityDescription = (EndpointEntityDescription) entityDescription;
-                    return MeterEntity.newEndpoint(
-                        InternalOps.dim(samples, endpointEntityDescription.getServiceKeys()),
-                        InternalOps.dim(samples, endpointEntityDescription.getEndpointKeys())
-                    );
+                    serviceName = InternalOps.dim(samples, endpointEntityDescription.getServiceKeys());
+                    endpointName = InternalOps.dim(samples, endpointEntityDescription.getEndpointKeys());
+                    if (Strings.isNullOrEmpty(serviceName) ||
+                        Strings.isNullOrEmpty(endpointName)) {
+                        log.warn(
+                            "service([{}]) or endpoint([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            serviceName, endpointName, entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newEndpoint(serviceName, endpointName));
                 case SERVICE_RELATION:
                     ServiceRelationEntityDescription serviceRelationEntityDescription = (ServiceRelationEntityDescription) entityDescription;
-                    return MeterEntity.newServiceRelation(
-                        InternalOps.dim(samples, serviceRelationEntityDescription.getSourceServiceKeys()),
-                        InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys()),
+                    String sourceServiceName = InternalOps.dim(samples, serviceRelationEntityDescription.getSourceServiceKeys());
+                    String destServiceName = InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys());
+                    if (Strings.isNullOrEmpty(sourceServiceName) ||
+                        Strings.isNullOrEmpty(destServiceName)) {
+                        log.warn(
+                            "source service([{}]) or dest service([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            sourceServiceName, destServiceName, entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newServiceRelation(
+                        sourceServiceName, destServiceName,
                         serviceRelationEntityDescription.getDetectPoint()
-                    );
+                    ));
                 default:
                     throw new UnexpectedException(
-                        "Unexpected scope type of entityDescription " + entityDescription.toString());
+                        "Unexpected scope type of entityDescription " + entityDescription);
             }
         }
 
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
index ecdc329..b1e4eac 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
@@ -19,12 +19,12 @@
 package org.apache.skywalking.oap.meter.analyzer.dsl.counter;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
 import lombok.AccessLevel;
 import lombok.EqualsAndHashCode;
 import lombok.RequiredArgsConstructor;
@@ -42,16 +42,12 @@ public class CounterWindow {
 
     public static final CounterWindow INSTANCE = new CounterWindow();
 
-    private final Map<ID, Tuple2<Long, Double>> lastElementMap = Maps.newHashMap();
-    private final Map<ID, Queue<Tuple2<Long, Double>>> windows = Maps.newHashMap();
+    private final Map<ID, Tuple2<Long, Double>> lastElementMap = new ConcurrentHashMap<>();
+    private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new ConcurrentHashMap<>();
 
     public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
         ID id = new ID(name, labels);
-        if (!windows.containsKey(id)) {
-            windows.put(id, new PriorityQueue<>());
-        }
-
-        Queue<Tuple2<Long, Double>> window = windows.get(id);
+        Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>());
         window.offer(Tuple.of(now, value));
         long waterLevel = now - windowSize;
         Tuple2<Long, Double> peek = window.peek();
@@ -77,8 +73,7 @@ public class CounterWindow {
         ID id = new ID(name, labels);
 
         Tuple2<Long, Double> element = Tuple.of(now, value);
-        Tuple2<Long, Double> result = lastElementMap.get(id);
-        lastElementMap.put(id, element);
+        Tuple2<Long, Double> result = lastElementMap.put(id, element);
         if (result == null) {
             return element;
         }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
index 4cb24a8..ea4e78a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
@@ -37,5 +37,6 @@ public class Rule implements MetricRuleConfig {
     private StaticConfig staticConfig;
     private String metricPrefix;
     private String expSuffix;
+    private String filter;
     private List<MetricsRule> metricsRules;
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
index e5e1813..4223548 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
@@ -87,6 +87,7 @@ public class AnalyzerTest {
     public void testSingle() {
         analyzer = Analyzer.build(
             "sum_service_instance",
+            null,
             "http_success_request.sum(['region', 'idc']).instance(['idc'] , ['region'])",
             meterSystem
         );
@@ -130,6 +131,7 @@ public class AnalyzerTest {
     public void testLabeled() {
         analyzer = Analyzer.build(
             "sum_service_instance_labels",
+            null,
             "http_success_request.sum(['region', 'idc' , 'instance']).instance(['idc'] , ['region'])",
             meterSystem
         );
@@ -178,6 +180,7 @@ public class AnalyzerTest {
     public void testHistogramPercentile() {
         analyzer = Analyzer.build(
             "instance_cpu_percentage",
+            null,
             "instance_cpu_percentage.sum(['le' , 'service' , 'instance']).histogram().histogram_percentile([75,99]).service(['service'])",
             meterSystem
         );
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java
new file mode 100644
index 0000000..d8ebdd5
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import static com.google.common.collect.ImmutableMap.of;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@RunWith(Parameterized.class)
+public class FilterTest {
+    @Parameterized.Parameter
+    public String name;
+
+    @Parameterized.Parameter(1)
+    public ImmutableMap<String, SampleFamily> input;
+
+    @Parameterized.Parameter(2)
+    public String expression;
+
+    @Parameterized.Parameter(3)
+    public Result want;
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> data() {
+        final SampleFamily sf =
+            SampleFamilyBuilder.newBuilder(
+                                   Sample.builder()
+                                         .value(1600592418480.0)
+                                         .labels(ImmutableMap.of("str", "val1"))
+                                         .name("instance_cpu_percentage")
+                                         .build(),
+                                   Sample.builder()
+                                         .value(1600592418480.0)
+                                         .labels(ImmutableMap.of("str", "val2"))
+                                         .name("instance_cpu_percentage")
+                                         .build())
+                               .build();
+        return Arrays.asList(new Object[][]{
+            {
+                "filter-string",
+                of("instance_cpu_percentage", sf),
+                "instance_cpu_percentage.filter({ tags -> tags.str == 'val1' })",
+                Result.success(SampleFamily.build(sf.context, sf.samples[0]))
+            },
+            {
+                "filter-none",
+                of("instance_cpu_percentage", sf),
+                "instance_cpu_percentage.filter({ tags -> tags.str == 'val2' })",
+                Result.success(SampleFamily.build(sf.context, sf.samples[1]))
+            },
+            {
+                "filter-not-equal",
+                of("instance_cpu_percentage", sf),
+                "instance_cpu_percentage.filter({ tags -> tags.str != 'val1' })",
+                Result.success(SampleFamily.build(sf.context, sf.samples[1]))
+            },
+        });
+    }
+
+    @Test
+    public void test() {
+        Expression e = DSL.parse(expression);
+        Result r = e.run(input);
+        assertThat(r, is(want));
+    }
+}
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
index ab9af94..1fdf28a 100644
--- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.receiver.otel.oc;
 
+import com.google.common.base.Strings;
 import com.google.protobuf.Timestamp;
 import io.grpc.stub.StreamObserver;
 import io.opencensus.proto.agent.common.v1.Node;
@@ -75,6 +76,10 @@ public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase i
                             nodeLabels.put("node_identifier_pid", String.valueOf(node.getIdentifier().getPid()));
                         }
                     }
+                    final String name = node.getServiceInfo().getName();
+                    if (!Strings.isNullOrEmpty(name)) {
+                        nodeLabels.put("job_name", name);
+                    }
                 }
                 metrics.forEach(m -> m.toMeter(request.getMetricsList().stream()
                     .flatMap(metric -> metric.getTimeseriesList().stream().map(timeSeries ->
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
index eebcf46..b167c5f 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
@@ -28,6 +28,7 @@ public class ZabbixConfig implements MetricRuleConfig {
 
     private String metricPrefix;
     private String expSuffix;
+    private String filter;
     private Entities entities;
     private List<String> requiredZabbixItemKeys;
     private List<Metric> metrics;
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
index f6468a8..752a6bd 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
@@ -28,6 +28,7 @@
 #    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
 #    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
 # </pre>
+filter: "{ tags -> tags.job_name == 'skywalking-so11y' }"
 expSuffix: tag({tags -> tags.service = 'oap::' + tags.service}).instance(['service'], ['host_name'])
 metricPrefix: meter_oap
 metricsRules:
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
index a2d0437..b030150 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
@@ -28,6 +28,7 @@
 #    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
 #    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
 # </pre>
+filter: "{ tags -> tags.job_name == 'vm-monitoring' }"
 expSuffix: tag({tags -> tags.node_identifier_host_name = 'vm::' + tags.node_identifier_host_name}).service(['node_identifier_host_name'])
 metricPrefix: meter_vm
 metricsRules:
diff --git a/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml b/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
index a0b06bc..770232e 100644
--- a/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
+++ b/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
@@ -17,7 +17,7 @@ receivers:
   prometheus:
     config:
       scrape_configs:
-        - job_name: 'otel-collector'
+        - job_name: 'vm-monitoring' # make sure to use this in the vm.yaml to filter only VM metrics
           scrape_interval: 10s
           static_configs:
             - targets: [ 'vm-service:9100' ]