You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/12/07 13:09:00 UTC

[skywalking] branch mal-patch created (now ed73020)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch mal-patch
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at ed73020  Fix MAL concurrent issues

This branch includes the following new commits:

     new db4bb69  Filter expression based on input samples
     new ed73020  Fix MAL concurrent issues

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/02: Filter expression based on input samples

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch mal-patch
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit db4bb693bf409c0c9893947117121c8f71316de5
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Dec 7 20:39:07 2020 +0800

    Filter expression based on input samples
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 CHANGES.md                                               |  1 +
 .../apache/skywalking/oap/meter/analyzer/Analyzer.java   | 15 +++++++++++++--
 .../skywalking/oap/meter/analyzer/dsl/Expression.java    | 16 ++++++++++++++--
 .../oap/meter/analyzer/dsl/ExpressionParsingContext.java |  3 +++
 .../apache/skywalking/oap/meter/analyzer/dsl/Result.java |  8 --------
 5 files changed, 31 insertions(+), 12 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ec78051..bf240c3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
 #### OAP-Backend
 * Make meter receiver support MAL.
 * Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters.
+* Fix MAL concurrent execution issues
 
 #### UI
 * Fix un-removed tags in trace query.
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 96253f4..457d017 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
+import java.util.List;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
@@ -79,6 +80,8 @@ public class Analyzer {
 
     private static final String FUNCTION_NAME_TEMP = "%s%s";
 
+    private List<String> samples;
+
     private final String metricName;
 
     private final Expression expression;
@@ -95,7 +98,15 @@ public class Analyzer {
      * @param sampleFamilies input samples.
      */
     public void analyse(final ImmutableMap<String, SampleFamily> sampleFamilies) {
-        Result r = expression.run(sampleFamilies);
+        ImmutableMap<String, SampleFamily> input = samples.stream().map(s -> Tuple.of(s, sampleFamilies.get(s)))
+            .filter(t -> t._2 != null).collect(ImmutableMap.toImmutableMap(t -> t._1, t -> t._2));
+        if (input.size() < 1) {
+            if (log.isDebugEnabled()) {
+                log.debug("{} is ignored due to the lack of {}", expression, samples);
+            }
+            return;
+        }
+        Result r = expression.run(input);
         if (!r.isSuccess()) {
             return;
         }
@@ -145,7 +156,6 @@ public class Analyzer {
                           send(v, time);
                       });
                 break;
-
         }
     }
 
@@ -183,6 +193,7 @@ public class Analyzer {
     }
 
     private void init(final ExpressionParsingContext ctx) {
+        this.samples = ctx.getSamples();
         if (ctx.isHistogram()) {
             if (ctx.getPercentiles() != null && ctx.getPercentiles().length > 0) {
                 metricType = MetricType.histogramPercentile;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
index 52b62b3..0cedea9 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
@@ -24,6 +24,7 @@ import groovy.lang.GroovyObjectSupport;
 import groovy.util.DelegatingScript;
 import java.time.Instant;
 import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -31,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
  */
 @Slf4j
 @RequiredArgsConstructor
+@ToString(of = {"literal"})
 public class Expression {
 
     private final String literal;
@@ -48,6 +50,9 @@ public class Expression {
             if (!r.isSuccess() && r.isThrowable()) {
                 throw new ExpressionParsingException("failed to parse expression: " + literal + ", error:" + r.getError());
             }
+            if (log.isDebugEnabled()) {
+                log.debug("\"{}\" is parsed", literal);
+            }
             ctx.validate(literal);
             return ctx;
         }
@@ -63,11 +68,12 @@ public class Expression {
         expression.setDelegate(new GroovyObjectSupport() {
 
             public SampleFamily propertyMissing(String metricName) {
+                ExpressionParsingContext.get().ifPresent(ctx -> ctx.samples.add(metricName));
                 if (sampleFamilies.containsKey(metricName)) {
                     return sampleFamilies.get(metricName);
                 }
-                if (log.isDebugEnabled()) {
-                    log.debug("{} doesn't exist in {}", metricName, sampleFamilies.keySet());
+                if (!ExpressionParsingContext.get().isPresent()) {
+                    log.warn("{} referred by \"{}\" doesn't exist in {}", metricName, literal, sampleFamilies.keySet());
                 }
                 return SampleFamily.EMPTY;
             }
@@ -91,10 +97,16 @@ public class Expression {
         try {
             SampleFamily sf = (SampleFamily) expression.run();
             if (sf == SampleFamily.EMPTY) {
+                if (!ExpressionParsingContext.get().isPresent()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("result of {} is empty by \"{}\"", sampleFamilies, literal);
+                    }
+                }
                 return Result.fail("Parsed result is an EMPTY sample family");
             }
             return Result.success(sf);
         } catch (Throwable t) {
+            log.error("failed to run \"{}\"", literal, t);
             return Result.fail(t);
         }
     }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
index 9d952f8..3e9fa3a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
@@ -42,6 +42,7 @@ public class ExpressionParsingContext implements Closeable {
     static ExpressionParsingContext create() {
         if (CACHE.get() == null) {
             CACHE.set(ExpressionParsingContext.builder()
+                                              .samples(Lists.newArrayList())
                                               .downsampling(DownsamplingType.AVG)
                                               .scopeLabels(Lists.newArrayList())
                                               .aggregationLabels(Lists.newArrayList()).build());
@@ -55,6 +56,8 @@ public class ExpressionParsingContext implements Closeable {
 
     private final static ThreadLocal<ExpressionParsingContext> CACHE = new ThreadLocal<>();
 
+    List<String> samples;
+
     boolean isHistogram;
 
     int[] percentiles;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
index a90ad51..1952196 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
@@ -23,12 +23,10 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * Result indicates the parsing result of expression.
  */
-@Slf4j
 @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
 @EqualsAndHashCode
 @ToString
@@ -42,7 +40,6 @@ public class Result {
      * @return failed result.
      */
     public static Result fail(final Throwable throwable) {
-        log.info("Expression fails: {}", throwable.getMessage());
         return new Result(false, true, throwable.getMessage(), SampleFamily.EMPTY);
     }
 
@@ -53,7 +50,6 @@ public class Result {
      * @return failed result.
      */
     public static Result fail(String message) {
-        log.info("Expression fails: {}", message);
         return new Result(false, false, message, SampleFamily.EMPTY);
     }
 
@@ -63,7 +59,6 @@ public class Result {
      * @return failed result.
      */
     public static Result fail() {
-        log.info("Expression fails");
         return new Result(false, false, null, SampleFamily.EMPTY);
     }
 
@@ -74,9 +69,6 @@ public class Result {
      * @return successful result.
      */
     public static Result success(SampleFamily sf) {
-        if (log.isDebugEnabled()) {
-            log.debug("Result is successful, sample family is {}", sf);
-        }
         return new Result(true, false, null, sf);
     }
 


[skywalking] 02/02: Fix MAL concurrent issues

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch mal-patch
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit ed7302098b5a1359a5a7efe481ee441b50c4220b
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Dec 7 21:07:58 2020 +0800

    Fix MAL concurrent issues
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .../oap/meter/analyzer/dsl/Expression.java         | 50 ++++++++++++++--------
 .../meter/analyzer/dsl/ExpressionParsingTest.java  |  1 +
 2 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
index 0cedea9..7cf84f6 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
@@ -23,7 +23,6 @@ import groovy.lang.ExpandoMetaClass;
 import groovy.lang.GroovyObjectSupport;
 import groovy.util.DelegatingScript;
 import java.time.Instant;
-import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
@@ -31,7 +30,6 @@ import lombok.extern.slf4j.Slf4j;
  * Expression is a reusable monadic container type which represents a DSL expression.
  */
 @Slf4j
-@RequiredArgsConstructor
 @ToString(of = {"literal"})
 public class Expression {
 
@@ -39,6 +37,14 @@ public class Expression {
 
     private final DelegatingScript expression;
 
+    private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
+
+    public Expression(final String literal, final DelegatingScript expression) {
+        this.literal = literal;
+        this.expression = expression;
+        this.empower();
+    }
+
     /**
      * Parse the expression statically.
      *
@@ -65,10 +71,35 @@ public class Expression {
      * @return The result of execution.
      */
     public Result run(final ImmutableMap<String, SampleFamily> sampleFamilies) {
+        propertyRepository.set(sampleFamilies);
+        try {
+            SampleFamily sf = (SampleFamily) expression.run();
+            if (sf == SampleFamily.EMPTY) {
+                if (!ExpressionParsingContext.get().isPresent()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("result of {} is empty by \"{}\"", sampleFamilies, literal);
+                    }
+                }
+                return Result.fail("Parsed result is an EMPTY sample family");
+            }
+            return Result.success(sf);
+        } catch (Throwable t) {
+            log.error("failed to run \"{}\"", literal, t);
+            return Result.fail(t);
+        } finally {
+            propertyRepository.remove();
+        }
+    }
+
+    private void empower() {
         expression.setDelegate(new GroovyObjectSupport() {
 
             public SampleFamily propertyMissing(String metricName) {
                 ExpressionParsingContext.get().ifPresent(ctx -> ctx.samples.add(metricName));
+                ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
+                if (sampleFamilies == null) {
+                    return SampleFamily.EMPTY;
+                }
                 if (sampleFamilies.containsKey(metricName)) {
                     return sampleFamilies.get(metricName);
                 }
@@ -94,21 +125,6 @@ public class Expression {
 
         });
         extendNumber(Number.class);
-        try {
-            SampleFamily sf = (SampleFamily) expression.run();
-            if (sf == SampleFamily.EMPTY) {
-                if (!ExpressionParsingContext.get().isPresent()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("result of {} is empty by \"{}\"", sampleFamilies, literal);
-                    }
-                }
-                return Result.fail("Parsed result is an EMPTY sample family");
-            }
-            return Result.success(sf);
-        } catch (Throwable t) {
-            log.error("failed to run \"{}\"", literal, t);
-            return Result.fail(t);
-        }
     }
 
     private void extendNumber(Class clazz) {
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
index 540362e..cca1801 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
@@ -65,6 +65,7 @@ public class ExpressionParsingTest {
                 "all",
                 "latest (foo - 1).tagEqual('bar', '1').sum(['tt']).irate().histogram().histogram_percentile([50,99]).service(['rr'])",
                 ExpressionParsingContext.builder()
+                                        .samples(Collections.singletonList("foo"))
                                         .scopeType(ScopeType.SERVICE)
                                         .scopeLabels(Collections.singletonList("rr"))
                                         .aggregationLabels(Collections.singletonList("tt"))