You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/05/27 16:46:26 UTC
[incubator-druid] branch master updated: Fix memory problem
(OOM/FGC) when expression is used in metricsSpec (#7716)
This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 42cf078 Fix memory problem (OOM/FGC) when expression is used in metricsSpec (#7716)
42cf078 is described below
commit 42cf07884345fa5fba70a17156678dc6c37ab3d1
Author: BIGrey <hu...@163.com>
AuthorDate: Tue May 28 00:46:17 2019 +0800
Fix memory problem (OOM/FGC) when expression is used in metricsSpec (#7716)
* AggregatorUtil should cache parsed expression to avoid memory problem (OOM/FGC) when Expression is used in metricsSpec
* remove debug log check in Parser.parse
* remove cache and use suppliers.memorize
---
.../druid/benchmark/datagen/BenchmarkSchemas.java | 18 +++++
.../druid/query/aggregation/AggregatorUtil.java | 20 ++---
.../aggregation/SimpleDoubleAggregatorFactory.java | 10 ++-
.../aggregation/SimpleFloatAggregatorFactory.java | 10 ++-
.../aggregation/SimpleLongAggregatorFactory.java | 10 ++-
.../incremental/OnheapIncrementalIndexTest.java | 94 ++++++++++++++++++++++
6 files changed, 139 insertions(+), 23 deletions(-)
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
index cda9f47..3e35a66 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
@@ -21,6 +21,7 @@ package org.apache.druid.benchmark.datagen;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
@@ -85,6 +86,14 @@ public class BenchmarkSchemas
basicSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
basicSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
+ List<AggregatorFactory> basicSchemaIngestAggsExpression = new ArrayList<>();
+ basicSchemaIngestAggsExpression.add(new CountAggregatorFactory("rows"));
+ basicSchemaIngestAggsExpression.add(new LongSumAggregatorFactory("sumLongSequential", null, "if(sumLongSequential>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumLongSequential,0)", ExprMacroTable.nil()));
+ basicSchemaIngestAggsExpression.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
+ basicSchemaIngestAggsExpression.add(new DoubleSumAggregatorFactory("sumFloatNormal", null, "if(sumFloatNormal>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumFloatNormal,0)", ExprMacroTable.nil()));
+ basicSchemaIngestAggsExpression.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
+ basicSchemaIngestAggsExpression.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
+
Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
@@ -93,7 +102,16 @@ public class BenchmarkSchemas
basicSchemaDataInterval,
true
);
+
+ BenchmarkSchemaInfo basicSchemaExpression = new BenchmarkSchemaInfo(
+ basicSchemaColumns,
+ basicSchemaIngestAggsExpression,
+ basicSchemaDataInterval,
+ true
+ );
+
SCHEMA_MAP.put("basic", basicSchema);
+ SCHEMA_MAP.put("expression", basicSchemaExpression);
}
static { // simple single string column and count agg schema, no rollup
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index bf1edd2..e5e5f51 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -24,8 +24,6 @@ import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.math.expr.Parser;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
@@ -184,9 +182,8 @@ public class AggregatorUtil
*/
static BaseFloatColumnValueSelector makeColumnValueSelectorWithFloatDefault(
final ColumnSelectorFactory metricFactory,
- final ExprMacroTable macroTable,
@Nullable final String fieldName,
- @Nullable final String fieldExpression,
+ @Nullable final Expr fieldExpression,
final float nullValue
)
{
@@ -196,8 +193,7 @@ public class AggregatorUtil
if (fieldName != null) {
return metricFactory.makeColumnValueSelector(fieldName);
} else {
- final Expr expr = Parser.parse(fieldExpression, macroTable);
- final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr);
+ final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
class ExpressionFloatColumnSelector implements FloatColumnSelector
{
@Override
@@ -231,9 +227,8 @@ public class AggregatorUtil
*/
static BaseLongColumnValueSelector makeColumnValueSelectorWithLongDefault(
final ColumnSelectorFactory metricFactory,
- final ExprMacroTable macroTable,
@Nullable final String fieldName,
- @Nullable final String fieldExpression,
+ @Nullable final Expr fieldExpression,
final long nullValue
)
{
@@ -243,8 +238,7 @@ public class AggregatorUtil
if (fieldName != null) {
return metricFactory.makeColumnValueSelector(fieldName);
} else {
- final Expr expr = Parser.parse(fieldExpression, macroTable);
- final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr);
+ final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
class ExpressionLongColumnSelector implements LongColumnSelector
{
@Override
@@ -276,9 +270,8 @@ public class AggregatorUtil
*/
static BaseDoubleColumnValueSelector makeColumnValueSelectorWithDoubleDefault(
final ColumnSelectorFactory metricFactory,
- final ExprMacroTable macroTable,
@Nullable final String fieldName,
- @Nullable final String fieldExpression,
+ @Nullable final Expr fieldExpression,
final double nullValue
)
{
@@ -288,8 +281,7 @@ public class AggregatorUtil
if (fieldName != null) {
return metricFactory.makeColumnValueSelector(fieldName);
} else {
- final Expr expr = Parser.parse(fieldExpression, macroTable);
- final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr);
+ final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
class ExpressionDoubleColumnSelector implements DoubleColumnSelector
{
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
index 9dc8a2d..bb3cfa7 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
@@ -43,6 +46,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
protected final String expression;
protected final ExprMacroTable macroTable;
protected final boolean storeDoubleAsFloat;
+ protected final Supplier<Expr> fieldExpression;
public SimpleDoubleAggregatorFactory(
ExprMacroTable macroTable,
@@ -56,6 +60,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
this.fieldName = fieldName;
this.expression = expression;
this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
+ this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable));
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
@@ -67,9 +72,8 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
{
return AggregatorUtil.makeColumnValueSelectorWithDoubleDefault(
metricFactory,
- macroTable,
fieldName,
- expression,
+ fieldExpression.get(),
nullValue
);
}
@@ -117,7 +121,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
{
return fieldName != null
? Collections.singletonList(fieldName)
- : Parser.findRequiredBindings(Parser.parse(expression, macroTable));
+ : Parser.findRequiredBindings(fieldExpression.get());
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java
index c535088..6b43113 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
@@ -41,6 +44,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
@Nullable
protected final String expression;
protected final ExprMacroTable macroTable;
+ protected final Supplier<Expr> fieldExpression;
public SimpleFloatAggregatorFactory(
ExprMacroTable macroTable,
@@ -53,6 +57,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
this.name = name;
this.fieldName = fieldName;
this.expression = expression;
+ this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable));
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
@@ -64,9 +69,8 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
{
return AggregatorUtil.makeColumnValueSelectorWithFloatDefault(
metricFactory,
- macroTable,
fieldName,
- expression,
+ fieldExpression.get(),
nullValue
);
}
@@ -111,7 +115,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
{
return fieldName != null
? Collections.singletonList(fieldName)
- : Parser.findRequiredBindings(Parser.parse(expression, macroTable));
+ : Parser.findRequiredBindings(fieldExpression.get());
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java
index ec7e222..f53a57d 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.segment.BaseLongColumnValueSelector;
@@ -41,6 +44,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
@Nullable
protected final String expression;
protected final ExprMacroTable macroTable;
+ protected final Supplier<Expr> fieldExpression;
public SimpleLongAggregatorFactory(
ExprMacroTable macroTable,
@@ -53,6 +57,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
this.name = name;
this.fieldName = fieldName;
this.expression = expression;
+ this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable));
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
@@ -64,9 +69,8 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
{
return AggregatorUtil.makeColumnValueSelectorWithLongDefault(
metricFactory,
- macroTable,
fieldName,
- expression,
+ fieldExpression.get(),
nullValue
);
}
@@ -107,7 +111,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
{
return fieldName != null
? Collections.singletonList(fieldName)
- : Parser.findRequiredBindings(Parser.parse(expression, macroTable));
+ : Parser.findRequiredBindings(fieldExpression.get());
}
@Override
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
index 76ce9d7..c5977a0 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
@@ -19,12 +19,17 @@
package org.apache.druid.segment.incremental;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregator;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@@ -104,6 +109,95 @@ public class OnheapIncrementalIndexTest
}
@Test
+ public void testMultithreadAddFactsUsingExpressionAndJavaScript() throws Exception
+ {
+ final IncrementalIndex indexExpr = new IncrementalIndex.Builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.MINUTE)
+ .withMetrics(new LongSumAggregatorFactory(
+ "oddnum",
+ null,
+ "if(value%2==1,1,0)",
+ TestExprMacroTable.INSTANCE
+ ))
+ .withRollup(true)
+ .build()
+ )
+ .setMaxRowCount(MAX_ROWS)
+ .buildOnheap();
+
+ final IncrementalIndex indexJs = new IncrementalIndex.Builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.MINUTE)
+ .withMetrics(new JavaScriptAggregatorFactory(
+ "oddnum",
+ ImmutableList.of("value"),
+ "function(current, value) { if (value%2==1) current = current + 1; return current;}",
+ "function() {return 0;}",
+ "function(a, b) { return a + b;}",
+ JavaScriptConfig.getEnabledInstance()
+ ))
+ .withRollup(true)
+ .build()
+ )
+ .setMaxRowCount(MAX_ROWS)
+ .buildOnheap();
+
+ final int addThreadCount = 2;
+ Thread[] addThreads = new Thread[addThreadCount];
+ for (int i = 0; i < addThreadCount; ++i) {
+ addThreads[i] = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ final Random random = ThreadLocalRandom.current();
+ try {
+ for (int j = 0; j < MAX_ROWS / addThreadCount; ++j) {
+ int randomInt = random.nextInt(100000);
+ MapBasedInputRow mapBasedInputRowExpr = new MapBasedInputRow(
+ 0,
+ Collections.singletonList("billy"),
+ ImmutableMap.of("billy", randomInt % 3, "value", randomInt)
+ );
+ MapBasedInputRow mapBasedInputRowJs = new MapBasedInputRow(
+ 0,
+ Collections.singletonList("billy"),
+ ImmutableMap.of("billy", randomInt % 3, "value", randomInt)
+ );
+ indexExpr.add(mapBasedInputRowExpr);
+ indexJs.add(mapBasedInputRowJs);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ addThreads[i].start();
+ }
+
+ for (int i = 0; i < addThreadCount; ++i) {
+ addThreads[i].join();
+ }
+
+ long exprSum = 0;
+ long jsSum = 0;
+
+ for (IncrementalIndexRow row : indexExpr.getFacts().keySet()) {
+ exprSum += indexExpr.getMetricLongValue(row.getRowIndex(), 0);
+ }
+
+ for (IncrementalIndexRow row : indexJs.getFacts().keySet()) {
+ jsSum += indexJs.getMetricLongValue(row.getRowIndex(), 0);
+ }
+
+ Assert.assertEquals(exprSum, jsSum);
+ }
+
+ @Test
public void testOnHeapIncrementalIndexClose() throws Exception
{
// Prepare the mocks & set close() call count expectation to 1
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org