You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/11/19 11:46:22 UTC

[skywalking] branch mal/enhance updated (15f7e00 -> a37dc93)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a change to branch mal/enhance
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


 discard 15f7e00  Add filter mechanism in MAL core and fix some bugs
     add 62d17d3  Support Istio 1.10.3, 1.11.4, 1.12.0 release (#8155)
     new a37dc93  Add filter mechanism in MAL core and fix some bugs

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (15f7e00)
            \
             N -- N -- N   refs/heads/mal/enhance (a37dc93)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/e2e.istio.yaml | 2 +-
 CHANGES.md                       | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)

[skywalking] 01/01: Add filter mechanism in MAL core and fix some bugs

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch mal/enhance
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit a37dc93e832964110efdb197ce757832d747bf95
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Nov 19 19:45:13 2021 +0800

    Add filter mechanism in MAL core and fix some bugs
    
    Also fixed some minor bugs
    
    * Fix concurrency bug in MAL `increase`-related calculation.
    * Fix a null pointer bug when building `SampleFamily`.
    
    Closes https://github.com/apache/skywalking/issues/8145
---
 CHANGES.md                                         |   3 +
 docs/en/setup/backend/backend-telemetry.md         |   2 +-
 .../provider/meter/config/MeterConfig.java         |   1 +
 .../provider/meter/process/MeterProcessor.java     |   2 +-
 .../skywalking/oap/meter/analyzer/Analyzer.java    |  26 +++-
 .../oap/meter/analyzer/MetricConvert.java          |   1 +
 .../oap/meter/analyzer/MetricRuleConfig.java       |   2 +
 .../oap/meter/analyzer/dsl/Expression.java         |   9 +-
 .../oap/meter/analyzer/dsl/FilterExpression.java   |  54 ++++++++
 .../oap/meter/analyzer/dsl/SampleFamily.java       | 138 +++++++++++++++------
 .../meter/analyzer/dsl/counter/CounterWindow.java  |  15 +--
 .../oap/meter/analyzer/prometheus/rule/Rule.java   |   1 +
 .../oap/meter/analyzer/dsl/AnalyzerTest.java       |   3 +
 .../oap/meter/analyzer/dsl/FilterTest.java         |  95 ++++++++++++++
 .../server/receiver/otel/oc/OCMetricHandler.java   |   5 +
 .../zabbix/provider/config/ZabbixConfig.java       |   1 +
 .../src/main/resources/otel-oc-rules/oap.yaml      |   1 +
 .../src/main/resources/otel-oc-rules/vm.yaml       |   1 +
 .../otel-collector-config.yaml                     |   2 +-
 19 files changed, 298 insertions(+), 64 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 7e9b3d5..86f2242 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -51,6 +51,9 @@ Release Notes.
 * Upgrade Kubernetes Java client to 14.0.0, supports GCP token refreshing and fixes some bugs.
 * Change `SO11Y` metric `envoy_als_in_count` to calculate the ALS message count.
 * Support Istio `1.10.3`, `1.11.4`, `1.12.0` release.(Tested through e2e)
+* Add filter mechanism in MAL core to filter metrics.
+* Fix concurrency bug in MAL `increase`-related calculation.
+* Fix a null pointer bug when building `SampleFamily`.
 
 #### UI
 
diff --git a/docs/en/setup/backend/backend-telemetry.md b/docs/en/setup/backend/backend-telemetry.md
index 5c7f8e9..e0a76d6 100644
--- a/docs/en/setup/backend/backend-telemetry.md
+++ b/docs/en/setup/backend/backend-telemetry.md
@@ -128,7 +128,7 @@ Set this up following these steps:
   ```
 2. Set up OpenTelemetry Collector and config a scrape job:
 ``` yaml
-- job_name: 'skywalking'
+- job_name: 'skywalking-so11y' # make sure to use this in the so11y.yaml to filter only so11y metrics
   metrics_path: '/metrics'
   kubernetes_sd_configs:
   - role: pod
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
index 121af54..c2e8930 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/config/MeterConfig.java
@@ -29,6 +29,7 @@ import java.util.List;
 public class MeterConfig implements MetricRuleConfig {
     private String metricPrefix;
     private String expSuffix;
+    private String filter;
     private List<Rule> metricsRules;
 
     @Data
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
index 14a41e1..698075d 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterProcessor.java
@@ -133,7 +133,7 @@ public class MeterProcessor {
         }
 
         try {
-            converts.stream().forEach(convert -> convert.toMeter(meters.entrySet().stream().collect(toImmutableMap(
+            converts.forEach(convert -> convert.toMeter(meters.entrySet().stream().collect(toImmutableMap(
                 Map.Entry::getKey,
                 v -> SampleFamilyBuilder.newBuilder(
                     v.getValue().stream().map(s -> s.build(service, serviceInstance, timestamp)).toArray(Sample[]::new)
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 2770f88..58c9e53 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.meter.analyzer;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
@@ -36,6 +37,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
 import org.apache.skywalking.oap.meter.analyzer.dsl.DownsamplingType;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
 import org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingContext;
+import org.apache.skywalking.oap.meter.analyzer.dsl.FilterExpression;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
@@ -56,6 +58,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileA
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.mapping;
@@ -74,11 +77,17 @@ public class Analyzer {
 
     public static final Tuple2<String, SampleFamily> NIL = Tuple.of("", null);
 
-    public static Analyzer build(final String metricName, final String expression,
+    public static Analyzer build(final String metricName,
+                                 final String filterExpression,
+                                 final String expression,
                                  final MeterSystem meterSystem) {
         Expression e = DSL.parse(expression);
+        FilterExpression filter = null;
+        if (!Strings.isNullOrEmpty(filterExpression)) {
+            filter = new FilterExpression(filterExpression);
+        }
         ExpressionParsingContext ctx = e.parse();
-        Analyzer analyzer = new Analyzer(metricName, e, meterSystem);
+        Analyzer analyzer = new Analyzer(metricName, filter, e, meterSystem);
         analyzer.init(ctx);
         return analyzer;
     }
@@ -89,6 +98,8 @@ public class Analyzer {
 
     private final String metricName;
 
+    private final FilterExpression filterExpression;
+
     private final Expression expression;
 
     private final MeterSystem meterSystem;
@@ -103,16 +114,19 @@ public class Analyzer {
      * @param sampleFamilies input samples.
      */
     public void analyse(final ImmutableMap<String, SampleFamily> sampleFamilies) {
-        ImmutableMap<String, SampleFamily> input = samples.stream()
-                                                          .map(s -> Tuple.of(s, sampleFamilies.get(s)))
-                                                          .filter(t -> t._2 != null)
-                                                          .collect(ImmutableMap.toImmutableMap(t -> t._1, t -> t._2));
+        Map<String, SampleFamily> input = samples.stream()
+                                                 .map(s -> Tuple.of(s, sampleFamilies.get(s)))
+                                                 .filter(t -> t._2 != null)
+                                                 .collect(toImmutableMap(t -> t._1, t -> t._2));
         if (input.size() < 1) {
             if (log.isDebugEnabled()) {
                 log.debug("{} is ignored due to the lack of {}", expression, samples);
             }
             return;
         }
+        if (filterExpression != null) {
+            input = filterExpression.filter(input);
+        }
         Result r = expression.run(input);
         if (!r.isSuccess()) {
             return;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
index c7bec66..07ff140 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricConvert.java
@@ -51,6 +51,7 @@ public class MetricConvert {
         this.analyzers = rule.getMetricsRules().stream().map(
             r -> Analyzer.build(
                 formatMetricName(rule, r.getName()),
+                rule.getFilter(),
                 Strings.isNullOrEmpty(rule.getExpSuffix()) ?
                     r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
                 service
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
index 64f96cc..831f4c8 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/MetricRuleConfig.java
@@ -40,6 +40,8 @@ public interface MetricRuleConfig {
      */
     List<? extends RuleConfig> getMetricsRules();
 
+    String getFilter();
+
     interface RuleConfig {
         /**
          * Get definition metrics name
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
index e9a0f6e..dce4af3 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
@@ -23,6 +23,7 @@ import groovy.lang.ExpandoMetaClass;
 import groovy.lang.GroovyObjectSupport;
 import groovy.util.DelegatingScript;
 import java.time.Instant;
+import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
@@ -38,7 +39,7 @@ public class Expression {
 
     private final DelegatingScript expression;
 
-    private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
+    private final ThreadLocal<Map<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
 
     public Expression(final String literal, final DelegatingScript expression) {
         this.literal = literal;
@@ -71,7 +72,7 @@ public class Expression {
      * @param sampleFamilies a data map includes all of candidates to be analysis.
      * @return The result of execution.
      */
-    public Result run(final ImmutableMap<String, SampleFamily> sampleFamilies) {
+    public Result run(final Map<String, SampleFamily> sampleFamilies) {
         propertyRepository.set(sampleFamilies);
         try {
             SampleFamily sf = (SampleFamily) expression.run();
@@ -114,7 +115,7 @@ public class Expression {
         public static final DownsamplingType LATEST = DownsamplingType.LATEST;
 
         private final String literal;
-        private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository;
+        private final ThreadLocal<Map<String, SampleFamily>> propertyRepository;
 
         public SampleFamily propertyMissing(String metricName) {
             ExpressionParsingContext.get().ifPresent(ctx -> {
@@ -122,7 +123,7 @@ public class Expression {
                     ctx.samples.add(metricName);
                 }
             });
-            ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
+            Map<String, SampleFamily> sampleFamilies = propertyRepository.get();
             if (sampleFamilies == null) {
                 return SampleFamily.EMPTY;
             }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java
new file mode 100644
index 0000000..69ffe9b
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl;
+
+import groovy.lang.Closure;
+import groovy.lang.GroovyShell;
+import java.util.Map;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import static java.util.stream.Collectors.toMap;
+
+@Slf4j
+@ToString(of = {"literal"})
+public class FilterExpression {
+    private final String literal;
+    private final Closure<Boolean> filterClosure;
+
+    @SuppressWarnings("unchecked")
+    public FilterExpression(final String literal) {
+        this.literal = literal;
+
+        GroovyShell sh = new GroovyShell();
+        filterClosure = (Closure<Boolean>) sh.evaluate(literal);
+    }
+
+    public Map<String, SampleFamily> filter(final Map<String, SampleFamily> sampleFamilies) {
+        try {
+            return sampleFamilies.entrySet().stream().collect(toMap(
+                Map.Entry::getKey,
+                it -> it.getValue().filter(filterClosure)
+            ));
+        } catch (Throwable t) {
+            log.error("failed to run \"{}\"", literal, t);
+        }
+        return sampleFamilies;
+    }
+}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index c153033..6be2740 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -18,22 +18,13 @@
 
 package org.apache.skywalking.oap.meter.analyzer.dsl;
 
-import com.google.common.base.CharMatcher;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.AtomicDouble;
-import groovy.lang.Closure;
-import io.vavr.Function2;
-import io.vavr.Function3;
-import lombok.AccessLevel;
-import lombok.Builder;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
+import static java.util.function.UnaryOperator.identity;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EndpointEntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EntityDescription;
 import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.InstanceEntityDescription;
@@ -43,6 +34,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
+import org.apache.skywalking.oap.server.core.source.DetectPoint;
 
 import java.util.Arrays;
 import java.util.Comparator;
@@ -57,13 +49,25 @@ import java.util.function.DoubleBinaryOperator;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
 
-import static com.google.common.collect.ImmutableMap.toImmutableMap;
-import static java.util.function.UnaryOperator.identity;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toList;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AtomicDouble;
+
+import groovy.lang.Closure;
+import io.vavr.Function2;
+import io.vavr.Function3;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * SampleFamily represents a collection of {@link Sample}.
@@ -71,13 +75,17 @@ import static java.util.stream.Collectors.toList;
 @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
 @EqualsAndHashCode
 @ToString
+@Slf4j
 public class SampleFamily {
     public static final SampleFamily EMPTY = new SampleFamily(new Sample[0], RunningContext.EMPTY);
 
     static SampleFamily build(RunningContext ctx, Sample... samples) {
         Preconditions.checkNotNull(samples);
-        samples = Arrays.stream(samples).filter(sample -> !Double.isNaN(sample.getValue())).toArray(Sample[]::new);
         Preconditions.checkArgument(samples.length > 0);
+        samples = Arrays.stream(samples).filter(sample -> !Double.isNaN(sample.getValue())).toArray(Sample[]::new);
+        if (samples.length == 0) {
+            return EMPTY;
+        }
         return new SampleFamily(samples, Optional.ofNullable(ctx).orElseGet(RunningContext::instance));
     }
 
@@ -332,6 +340,19 @@ public class SampleFamily {
         );
     }
 
+    public SampleFamily filter(Closure<Boolean> filter) {
+        if (this == EMPTY) {
+            return EMPTY;
+        }
+        final Sample[] filtered = Arrays.stream(samples)
+                                        .filter(it -> filter.call(it.labels))
+                                        .toArray(Sample[]::new);
+        if (filtered.length == 0) {
+            return EMPTY;
+        }
+        return SampleFamily.build(context, filtered);
+    }
+
     /* k8s retags*/
     public SampleFamily retagByK8sMeta(String newLabelName,
                                        K8sRetagType type,
@@ -467,8 +488,9 @@ public class SampleFamily {
                                   mapping(identity(), toList())
               ))
               .forEach((labels, samples) -> {
-                  MeterEntity meterEntity = InternalOps.buildMeterEntity(samples, entityDescription);
-                  meterSamples.put(meterEntity, InternalOps.left(samples, entityDescription.getLabelKeys()));
+                  Optional<MeterEntity> meterEntity = InternalOps.buildMeterEntity(samples, entityDescription);
+                  meterEntity.ifPresent(it -> meterSamples.put(
+                      it, InternalOps.left(samples, entityDescription.getLabelKeys())));
               });
 
         this.context.setMeterSamples(meterSamples);
@@ -577,34 +599,68 @@ public class SampleFamily {
             return CharMatcher.is('.').trimFrom(name);
         }
 
-        private static MeterEntity buildMeterEntity(List<Sample> samples,
-                                                    EntityDescription entityDescription) {
+        private static Optional<MeterEntity> buildMeterEntity(List<Sample> samples,
+                                                              EntityDescription entityDescription) {
+            final String serviceName;
+            final String serviceInstanceName;
+            final String endpointName;
             switch (entityDescription.getScopeType()) {
                 case SERVICE:
                     ServiceEntityDescription serviceEntityDescription = (ServiceEntityDescription) entityDescription;
-                    return MeterEntity.newService(InternalOps.dim(samples, serviceEntityDescription.getServiceKeys()));
+                    serviceName = InternalOps.dim(samples, serviceEntityDescription.getServiceKeys());
+                    if (Strings.isNullOrEmpty(serviceName)) {
+                        log.warn(
+                            "service built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newService(serviceName));
                 case SERVICE_INSTANCE:
                     InstanceEntityDescription instanceEntityDescription = (InstanceEntityDescription) entityDescription;
-                    return MeterEntity.newServiceInstance(
-                        InternalOps.dim(samples, instanceEntityDescription.getServiceKeys()),
-                        InternalOps.dim(samples, instanceEntityDescription.getInstanceKeys())
-                    );
+                    serviceName = InternalOps.dim(samples, instanceEntityDescription.getServiceKeys());
+                    serviceInstanceName = InternalOps.dim(samples, instanceEntityDescription.getInstanceKeys());
+                    if (Strings.isNullOrEmpty(serviceName) ||
+                        Strings.isNullOrEmpty(serviceInstanceName)) {
+                        log.warn(
+                            "service([{}]) or instance([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            serviceName, serviceInstanceName, entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newServiceInstance(serviceName, serviceInstanceName));
                 case ENDPOINT:
                     EndpointEntityDescription endpointEntityDescription = (EndpointEntityDescription) entityDescription;
-                    return MeterEntity.newEndpoint(
-                        InternalOps.dim(samples, endpointEntityDescription.getServiceKeys()),
-                        InternalOps.dim(samples, endpointEntityDescription.getEndpointKeys())
-                    );
+                    serviceName = InternalOps.dim(samples, endpointEntityDescription.getServiceKeys());
+                    endpointName = InternalOps.dim(samples, endpointEntityDescription.getEndpointKeys());
+                    if (Strings.isNullOrEmpty(serviceName) ||
+                        Strings.isNullOrEmpty(endpointName)) {
+                        log.warn(
+                            "service([{}]) or endpoint([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            serviceName, endpointName, entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newEndpoint(serviceName, endpointName));
                 case SERVICE_RELATION:
                     ServiceRelationEntityDescription serviceRelationEntityDescription = (ServiceRelationEntityDescription) entityDescription;
-                    return MeterEntity.newServiceRelation(
-                        InternalOps.dim(samples, serviceRelationEntityDescription.getSourceServiceKeys()),
-                        InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys()),
+                    String sourceServiceName = InternalOps.dim(samples, serviceRelationEntityDescription.getSourceServiceKeys());
+                    String destServiceName = InternalOps.dim(samples, serviceRelationEntityDescription.getDestServiceKeys());
+                    if (Strings.isNullOrEmpty(sourceServiceName) ||
+                        Strings.isNullOrEmpty(destServiceName)) {
+                        log.warn(
+                            "source service([{}]) or dest service([{}]) built from labels [{}] is empty, please check the labels are valid and their values are not empty",
+                            sourceServiceName, destServiceName, entityDescription
+                        );
+                        return Optional.empty();
+                    }
+                    return Optional.of(MeterEntity.newServiceRelation(
+                        sourceServiceName, destServiceName,
                         serviceRelationEntityDescription.getDetectPoint()
-                    );
+                    ));
                 default:
                     throw new UnexpectedException(
-                        "Unexpected scope type of entityDescription " + entityDescription.toString());
+                        "Unexpected scope type of entityDescription " + entityDescription);
             }
         }
 
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
index ecdc329..b1e4eac 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
@@ -19,12 +19,12 @@
 package org.apache.skywalking.oap.meter.analyzer.dsl.counter;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
 import lombok.AccessLevel;
 import lombok.EqualsAndHashCode;
 import lombok.RequiredArgsConstructor;
@@ -42,16 +42,12 @@ public class CounterWindow {
 
     public static final CounterWindow INSTANCE = new CounterWindow();
 
-    private final Map<ID, Tuple2<Long, Double>> lastElementMap = Maps.newHashMap();
-    private final Map<ID, Queue<Tuple2<Long, Double>>> windows = Maps.newHashMap();
+    private final Map<ID, Tuple2<Long, Double>> lastElementMap = new ConcurrentHashMap<>();
+    private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new ConcurrentHashMap<>();
 
     public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
         ID id = new ID(name, labels);
-        if (!windows.containsKey(id)) {
-            windows.put(id, new PriorityQueue<>());
-        }
-
-        Queue<Tuple2<Long, Double>> window = windows.get(id);
+        Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>());
         window.offer(Tuple.of(now, value));
         long waterLevel = now - windowSize;
         Tuple2<Long, Double> peek = window.peek();
@@ -77,8 +73,7 @@ public class CounterWindow {
         ID id = new ID(name, labels);
 
         Tuple2<Long, Double> element = Tuple.of(now, value);
-        Tuple2<Long, Double> result = lastElementMap.get(id);
-        lastElementMap.put(id, element);
+        Tuple2<Long, Double> result = lastElementMap.put(id, element);
         if (result == null) {
             return element;
         }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
index 4cb24a8..ea4e78a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/rule/Rule.java
@@ -37,5 +37,6 @@ public class Rule implements MetricRuleConfig {
     private StaticConfig staticConfig;
     private String metricPrefix;
     private String expSuffix;
+    private String filter;
     private List<MetricsRule> metricsRules;
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
index e5e1813..4223548 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/AnalyzerTest.java
@@ -87,6 +87,7 @@ public class AnalyzerTest {
     public void testSingle() {
         analyzer = Analyzer.build(
             "sum_service_instance",
+            null,
             "http_success_request.sum(['region', 'idc']).instance(['idc'] , ['region'])",
             meterSystem
         );
@@ -130,6 +131,7 @@ public class AnalyzerTest {
     public void testLabeled() {
         analyzer = Analyzer.build(
             "sum_service_instance_labels",
+            null,
             "http_success_request.sum(['region', 'idc' , 'instance']).instance(['idc'] , ['region'])",
             meterSystem
         );
@@ -178,6 +180,7 @@ public class AnalyzerTest {
     public void testHistogramPercentile() {
         analyzer = Analyzer.build(
             "instance_cpu_percentage",
+            null,
             "instance_cpu_percentage.sum(['le' , 'service' , 'instance']).histogram().histogram_percentile([75,99]).service(['service'])",
             meterSystem
         );
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java
new file mode 100644
index 0000000..d8ebdd5
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/FilterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import static com.google.common.collect.ImmutableMap.of;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@RunWith(Parameterized.class)
+public class FilterTest {
+    @Parameterized.Parameter
+    public String name;
+
+    @Parameterized.Parameter(1)
+    public ImmutableMap<String, SampleFamily> input;
+
+    @Parameterized.Parameter(2)
+    public String expression;
+
+    @Parameterized.Parameter(3)
+    public Result want;
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> data() {
+        final SampleFamily sf =
+            SampleFamilyBuilder.newBuilder(
+                                   Sample.builder()
+                                         .value(1600592418480.0)
+                                         .labels(ImmutableMap.of("str", "val1"))
+                                         .name("instance_cpu_percentage")
+                                         .build(),
+                                   Sample.builder()
+                                         .value(1600592418480.0)
+                                         .labels(ImmutableMap.of("str", "val2"))
+                                         .name("instance_cpu_percentage")
+                                         .build())
+                               .build();
+        return Arrays.asList(new Object[][]{
+            {
+                "filter-string",
+                of("instance_cpu_percentage", sf),
+                "instance_cpu_percentage.filter({ tags -> tags.str == 'val1' })",
+                Result.success(SampleFamily.build(sf.context, sf.samples[0]))
+            },
+            {
+                "filter-none",
+                of("instance_cpu_percentage", sf),
+                "instance_cpu_percentage.filter({ tags -> tags.str == 'val2' })",
+                Result.success(SampleFamily.build(sf.context, sf.samples[1]))
+            },
+            {
+                "filter-not-equal",
+                of("instance_cpu_percentage", sf),
+                "instance_cpu_percentage.filter({ tags -> tags.str != 'val1' })",
+                Result.success(SampleFamily.build(sf.context, sf.samples[1]))
+            },
+        });
+    }
+
+    @Test
+    public void test() {
+        Expression e = DSL.parse(expression);
+        Result r = e.run(input);
+        assertThat(r, is(want));
+    }
+}
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
index ab9af94..1fdf28a 100644
--- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/oc/OCMetricHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.receiver.otel.oc;
 
+import com.google.common.base.Strings;
 import com.google.protobuf.Timestamp;
 import io.grpc.stub.StreamObserver;
 import io.opencensus.proto.agent.common.v1.Node;
@@ -75,6 +76,10 @@ public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase i
                             nodeLabels.put("node_identifier_pid", String.valueOf(node.getIdentifier().getPid()));
                         }
                     }
+                    final String name = node.getServiceInfo().getName();
+                    if (!Strings.isNullOrEmpty(name)) {
+                        nodeLabels.put("job_name", name);
+                    }
                 }
                 metrics.forEach(m -> m.toMeter(request.getMetricsList().stream()
                     .flatMap(metric -> metric.getTimeseriesList().stream().map(timeSeries ->
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
index eebcf46..b167c5f 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/config/ZabbixConfig.java
@@ -28,6 +28,7 @@ public class ZabbixConfig implements MetricRuleConfig {
 
     private String metricPrefix;
     private String expSuffix;
+    private String filter;
     private Entities entities;
     private List<String> requiredZabbixItemKeys;
     private List<Metric> metrics;
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
index f6468a8..752a6bd 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/oap.yaml
@@ -28,6 +28,7 @@
 #    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
 #    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
 # </pre>
+filter: "{ tags -> tags.job_name == 'skywalking-so11y' }"
 expSuffix: tag({tags -> tags.service = 'oap::' + tags.service}).instance(['service'], ['host_name'])
 metricPrefix: meter_oap
 metricsRules:
diff --git a/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml b/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
index a2d0437..b030150 100644
--- a/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-oc-rules/vm.yaml
@@ -28,6 +28,7 @@
 #    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
 #    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
 # </pre>
+filter: "{ tags -> tags.job_name == 'vm-monitoring' }"
 expSuffix: tag({tags -> tags.node_identifier_host_name = 'vm::' + tags.node_identifier_host_name}).service(['node_identifier_host_name'])
 metricPrefix: meter_vm
 metricsRules:
diff --git a/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml b/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
index a0b06bc..770232e 100644
--- a/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
+++ b/test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml
@@ -17,7 +17,7 @@ receivers:
   prometheus:
     config:
       scrape_configs:
-        - job_name: 'otel-collector'
+        - job_name: 'vm-monitoring' # make sure to use this in the vm.yaml to filter only VM metrics
           scrape_interval: 10s
           static_configs:
             - targets: [ 'vm-service:9100' ]