You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/12/07 21:48:37 UTC
[skywalking] branch master updated: Fix MAL concurrent execution
issues (#5965)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 972145c Fix MAL concurrent execution issues (#5965)
972145c is described below
commit 972145c2083936d83d8d83270e2ffa84cdb04dd5
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Dec 8 05:43:39 2020 +0800
Fix MAL concurrent execution issues (#5965)
* Filter expression based on input samples
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Fix MAL concurrent issues
Signed-off-by: Gao Hongtao <ha...@gmail.com>
Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
CHANGES.md | 1 +
.../skywalking/oap/meter/analyzer/Analyzer.java | 15 +++++-
.../oap/meter/analyzer/dsl/Expression.java | 54 ++++++++++++++++------
.../analyzer/dsl/ExpressionParsingContext.java | 3 ++
.../skywalking/oap/meter/analyzer/dsl/Result.java | 8 ----
.../meter/analyzer/dsl/ExpressionParsingTest.java | 1 +
6 files changed, 59 insertions(+), 23 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d90a2f0..228bf47 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
* Make meter receiver support MAL.
* Support influxDB connection response format option. Fix some error when use JSON as influxDB response format.
* Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters.
+* Fix MAL concurrent execution issues
#### UI
* Fix un-removed tags in trace query.
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 96253f4..457d017 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.vavr.Tuple;
import io.vavr.Tuple2;
+import java.util.List;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
@@ -79,6 +80,8 @@ public class Analyzer {
private static final String FUNCTION_NAME_TEMP = "%s%s";
+ private List<String> samples;
+
private final String metricName;
private final Expression expression;
@@ -95,7 +98,15 @@ public class Analyzer {
* @param sampleFamilies input samples.
*/
public void analyse(final ImmutableMap<String, SampleFamily> sampleFamilies) {
- Result r = expression.run(sampleFamilies);
+ ImmutableMap<String, SampleFamily> input = samples.stream().map(s -> Tuple.of(s, sampleFamilies.get(s)))
+ .filter(t -> t._2 != null).collect(ImmutableMap.toImmutableMap(t -> t._1, t -> t._2));
+ if (input.size() < 1) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} is ignored due to the lack of {}", expression, samples);
+ }
+ return;
+ }
+ Result r = expression.run(input);
if (!r.isSuccess()) {
return;
}
@@ -145,7 +156,6 @@ public class Analyzer {
send(v, time);
});
break;
-
}
}
@@ -183,6 +193,7 @@ public class Analyzer {
}
private void init(final ExpressionParsingContext ctx) {
+ this.samples = ctx.getSamples();
if (ctx.isHistogram()) {
if (ctx.getPercentiles() != null && ctx.getPercentiles().length > 0) {
metricType = MetricType.histogramPercentile;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
index 52b62b3..7cf84f6 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Expression.java
@@ -23,20 +23,28 @@ import groovy.lang.ExpandoMetaClass;
import groovy.lang.GroovyObjectSupport;
import groovy.util.DelegatingScript;
import java.time.Instant;
-import lombok.RequiredArgsConstructor;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* Expression is a reusable monadic container type which represents a DSL expression.
*/
@Slf4j
-@RequiredArgsConstructor
+@ToString(of = {"literal"})
public class Expression {
private final String literal;
private final DelegatingScript expression;
+ private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository = new ThreadLocal<>();
+
+ public Expression(final String literal, final DelegatingScript expression) {
+ this.literal = literal;
+ this.expression = expression;
+ this.empower();
+ }
+
/**
* Parse the expression statically.
*
@@ -48,6 +56,9 @@ public class Expression {
if (!r.isSuccess() && r.isThrowable()) {
throw new ExpressionParsingException("failed to parse expression: " + literal + ", error:" + r.getError());
}
+ if (log.isDebugEnabled()) {
+ log.debug("\"{}\" is parsed", literal);
+ }
ctx.validate(literal);
return ctx;
}
@@ -60,14 +71,40 @@ public class Expression {
* @return The result of execution.
*/
public Result run(final ImmutableMap<String, SampleFamily> sampleFamilies) {
+ propertyRepository.set(sampleFamilies);
+ try {
+ SampleFamily sf = (SampleFamily) expression.run();
+ if (sf == SampleFamily.EMPTY) {
+ if (!ExpressionParsingContext.get().isPresent()) {
+ if (log.isDebugEnabled()) {
+ log.debug("result of {} is empty by \"{}\"", sampleFamilies, literal);
+ }
+ }
+ return Result.fail("Parsed result is an EMPTY sample family");
+ }
+ return Result.success(sf);
+ } catch (Throwable t) {
+ log.error("failed to run \"{}\"", literal, t);
+ return Result.fail(t);
+ } finally {
+ propertyRepository.remove();
+ }
+ }
+
+ private void empower() {
expression.setDelegate(new GroovyObjectSupport() {
public SampleFamily propertyMissing(String metricName) {
+ ExpressionParsingContext.get().ifPresent(ctx -> ctx.samples.add(metricName));
+ ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
+ if (sampleFamilies == null) {
+ return SampleFamily.EMPTY;
+ }
if (sampleFamilies.containsKey(metricName)) {
return sampleFamilies.get(metricName);
}
- if (log.isDebugEnabled()) {
- log.debug("{} doesn't exist in {}", metricName, sampleFamilies.keySet());
+ if (!ExpressionParsingContext.get().isPresent()) {
+ log.warn("{} referred by \"{}\" doesn't exist in {}", metricName, literal, sampleFamilies.keySet());
}
return SampleFamily.EMPTY;
}
@@ -88,15 +125,6 @@ public class Expression {
});
extendNumber(Number.class);
- try {
- SampleFamily sf = (SampleFamily) expression.run();
- if (sf == SampleFamily.EMPTY) {
- return Result.fail("Parsed result is an EMPTY sample family");
- }
- return Result.success(sf);
- } catch (Throwable t) {
- return Result.fail(t);
- }
}
private void extendNumber(Class clazz) {
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
index 9d952f8..3e9fa3a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
@@ -42,6 +42,7 @@ public class ExpressionParsingContext implements Closeable {
static ExpressionParsingContext create() {
if (CACHE.get() == null) {
CACHE.set(ExpressionParsingContext.builder()
+ .samples(Lists.newArrayList())
.downsampling(DownsamplingType.AVG)
.scopeLabels(Lists.newArrayList())
.aggregationLabels(Lists.newArrayList()).build());
@@ -55,6 +56,8 @@ public class ExpressionParsingContext implements Closeable {
private final static ThreadLocal<ExpressionParsingContext> CACHE = new ThreadLocal<>();
+ List<String> samples;
+
boolean isHistogram;
int[] percentiles;
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
index a90ad51..1952196 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Result.java
@@ -23,12 +23,10 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
/**
* Result indicates the parsing result of expression.
*/
-@Slf4j
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@EqualsAndHashCode
@ToString
@@ -42,7 +40,6 @@ public class Result {
* @return failed result.
*/
public static Result fail(final Throwable throwable) {
- log.info("Expression fails: {}", throwable.getMessage());
return new Result(false, true, throwable.getMessage(), SampleFamily.EMPTY);
}
@@ -53,7 +50,6 @@ public class Result {
* @return failed result.
*/
public static Result fail(String message) {
- log.info("Expression fails: {}", message);
return new Result(false, false, message, SampleFamily.EMPTY);
}
@@ -63,7 +59,6 @@ public class Result {
* @return failed result.
*/
public static Result fail() {
- log.info("Expression fails");
return new Result(false, false, null, SampleFamily.EMPTY);
}
@@ -74,9 +69,6 @@ public class Result {
* @return successful result.
*/
public static Result success(SampleFamily sf) {
- if (log.isDebugEnabled()) {
- log.debug("Result is successful, sample family is {}", sf);
- }
return new Result(true, false, null, sf);
}
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
index 540362e..cca1801 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingTest.java
@@ -65,6 +65,7 @@ public class ExpressionParsingTest {
"all",
"latest (foo - 1).tagEqual('bar', '1').sum(['tt']).irate().histogram().histogram_percentile([50,99]).service(['rr'])",
ExpressionParsingContext.builder()
+ .samples(Collections.singletonList("foo"))
.scopeType(ScopeType.SERVICE)
.scopeLabels(Collections.singletonList("rr"))
.aggregationLabels(Collections.singletonList("tt"))