You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/12/07 21:48:37 UTC

[skywalking] branch master updated: Fix MAL concurrent execution issues (#5965)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 972145c  Fix MAL concurrent execution issues (#5965)
972145c is described below

commit 972145c2083936d83d8d83270e2ffa84cdb04dd5
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Dec 8 05:43:39 2020 +0800

    Fix MAL concurrent execution issues (#5965)
    
    * Filter expression based on input samples
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
    
    * Fix MAL concurrent issues
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
    
    Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
 CHANGES.md                                         |  1 +
 .../skywalking/oap/meter/analyzer/Analyzer.java    | 15 +++++-
 .../oap/meter/analyzer/dsl/Expression.java         | 54 ++++++++++++++++------
 .../analyzer/dsl/ExpressionParsingContext.java     |  3 ++
 .../skywalking/oap/meter/analyzer/dsl/Result.java  |  8 ----
 .../meter/analyzer/dsl/ExpressionParsingTest.java  |  1 +
 6 files changed, 59 insertions(+), 23 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d90a2f0..228bf47 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
 * Make meter receiver support MAL.
 * Support influxDB connection response format option. Fix some error when use JSON as influxDB response format.
 * Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters.
+* Fix MAL concurrent execution issues
 
 #### UI
 * Fix un-removed tags in trace query.
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 96253f4..457d017 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
+import java.util.List;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
@@ -79,6 +80,8 @@ public class Analyzer {
 
     private static final String FUNCTION_NAME_TEMP = "%s%s";
 
+    private List<String> samples;
+
     private final String metricName;
 
     private final Expression expression;
@@ -95,7 +98,15 @@ public class Analyzer {
      * @param sampleFamilies input samples.
      */
     public void analyse(final ImmutableMap<String, SampleFamily> sampleFamilies) {
-        Result r = expression.run(sampleFamilies);
+        ImmutableMap<String, SampleFamily> input = samples.stream().map(s -> Tuple.of(s, sampleFamilies.get(s)))
+            .filter(t -> t._2 != null).collect(ImmutableMap.toImmutableMap(t -> t._1, t -> t._2));
+        if (input.size() < 1) {
+            if (log.isDebugEnabled()) {
+                log.debug("{} is ignored due to the lack of {}", expression, samples);
+            }
+            return;
+        }
+        Result r = expression.run(input);
         if (!r.isSuccess()) {
             return;
         }
@@ -145,7 +156,6 @@ public class Analyzer {
                           send(v, time);
                       });
                 break;
-
         }
     }
 
@@ -183,6 +193,7 @@ public class Analyzer {
     }
 
     private void init(final ExpressionParsingContext ctx) {
+        this.samples = ctx.getSamples();
         if (ctx.isHistogram()) {
             if (ctx.getPercentiles() != null && ctx.getPercentiles().length > 0) {
                 metricType = MetricType.histogramPercentile;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
index 52b62b3..7cf84f6 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
@@ -23,20 +23,28 @@ import groovy.lang.ExpandoMetaClass;
 import groovy.lang.GroovyObjectSupport;
 import groovy.util.DelegatingScript;
 import java.time.Instant;
-import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 /**
  * Expression is a reusable monadic container type which represents a DSL expression.
  */
 @Slf4j
-@RequiredArgsConstructor
+@ToString(of = {"literal"})
 public class Expression {
 
     private final String literal;
 
     private final DelegatingScript expression;
 
+    private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
+
+    public Expression(final String literal, final DelegatingScript expression) {
+        this.literal = literal;
+        this.expression = expression;
+        this.empower();
+    }
+
     /**
      * Parse the expression statically.
      *
@@ -48,6 +56,9 @@ public class Expression {
             if (!r.isSuccess() && r.isThrowable()) {
                 throw new ExpressionParsingException("failed to parse expression: " + literal + ", error:" + r.getError());
             }
+            if (log.isDebugEnabled()) {
+                log.debug("\"{}\" is parsed", literal);
+            }
             ctx.validate(literal);
             return ctx;
         }
@@ -60,14 +71,40 @@ public class Expression {
      * @return The result of execution.
      */
     public Result run(final ImmutableMap<String, SampleFamily> sampleFamilies) {
+        propertyRepository.set(sampleFamilies);
+        try {
+            SampleFamily sf = (SampleFamily) expression.run();
+            if (sf == SampleFamily.EMPTY) {
+                if (!ExpressionParsingContext.get().isPresent()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("result of {} is empty by \"{}\"", sampleFamilies, literal);
+                    }
+                }
+                return Result.fail("Parsed result is an EMPTY sample family");
+            }
+            return Result.success(sf);
+        } catch (Throwable t) {
+            log.error("failed to run \"{}\"", literal, t);
+            return Result.fail(t);
+        } finally {
+            propertyRepository.remove();
+        }
+    }
+
+    private void empower() {
         expression.setDelegate(new GroovyObjectSupport() {
 
             public SampleFamily propertyMissing(String metricName) {
+                ExpressionParsingContext.get().ifPresent(ctx -> ctx.samples.add(metricName));
+                ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
+                if (sampleFamilies == null) {
+                    return SampleFamily.EMPTY;
+                }
                 if (sampleFamilies.containsKey(metricName)) {
                     return sampleFamilies.get(metricName);
                 }
-                if (log.isDebugEnabled()) {
-                    log.debug("{} doesn't exist in {}", metricName, sampleFamilies.keySet());
+                if (!ExpressionParsingContext.get().isPresent()) {
+                    log.warn("{} referred by \"{}\" doesn't exist in {}", metricName, literal, sampleFamilies.keySet());
                 }
                 return SampleFamily.EMPTY;
             }
@@ -88,15 +125,6 @@ public class Expression {
 
         });
         extendNumber(Number.class);
-        try {
-            SampleFamily sf = (SampleFamily) expression.run();
-            if (sf == SampleFamily.EMPTY) {
-                return Result.fail("Parsed result is an EMPTY sample family");
-            }
-            return Result.success(sf);
-        } catch (Throwable t) {
-            return Result.fail(t);
-        }
     }
 
     private void extendNumber(Class clazz) {
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
index 9d952f8..3e9fa3a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
@@ -42,6 +42,7 @@ public class ExpressionParsingContext implements Closeable {
     static ExpressionParsingContext create() {
         if (CACHE.get() == null) {
             CACHE.set(ExpressionParsingContext.builder()
+                                              .samples(Lists.newArrayList())
                                               .downsampling(DownsamplingType.AVG)
                                               .scopeLabels(Lists.newArrayList())
                                               .aggregationLabels(Lists.newArrayList()).build());
@@ -55,6 +56,8 @@ public class ExpressionParsingContext implements Closeable {
 
     private final static ThreadLocal<ExpressionParsingContext> CACHE = new ThreadLocal<>();
 
+    List<String> samples;
+
     boolean isHistogram;
 
     int[] percentiles;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
index a90ad51..1952196 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
@@ -23,12 +23,10 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * Result indicates the parsing result of expression.
  */
-@Slf4j
 @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
 @EqualsAndHashCode
 @ToString
@@ -42,7 +40,6 @@ public class Result {
      * @return failed result.
      */
     public static Result fail(final Throwable throwable) {
-        log.info("Expression fails: {}", throwable.getMessage());
         return new Result(false, true, throwable.getMessage(), SampleFamily.EMPTY);
     }
 
@@ -53,7 +50,6 @@ public class Result {
      * @return failed result.
      */
     public static Result fail(String message) {
-        log.info("Expression fails: {}", message);
         return new Result(false, false, message, SampleFamily.EMPTY);
     }
 
@@ -63,7 +59,6 @@ public class Result {
      * @return failed result.
      */
     public static Result fail() {
-        log.info("Expression fails");
         return new Result(false, false, null, SampleFamily.EMPTY);
     }
 
@@ -74,9 +69,6 @@ public class Result {
      * @return successful result.
      */
     public static Result success(SampleFamily sf) {
-        if (log.isDebugEnabled()) {
-            log.debug("Result is successful, sample family is {}", sf);
-        }
         return new Result(true, false, null, sf);
     }
 
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
index 540362e..cca1801 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
@@ -65,6 +65,7 @@ public class ExpressionParsingTest {
                 "all",
                 "latest (foo - 1).tagEqual('bar', '1').sum(['tt']).irate().histogram().histogram_percentile([50,99]).service(['rr'])",
                 ExpressionParsingContext.builder()
+                                        .samples(Collections.singletonList("foo"))
                                         .scopeType(ScopeType.SERVICE)
                                         .scopeLabels(Collections.singletonList("rr"))
                                         .aggregationLabels(Collections.singletonList("tt"))