You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/05/27 16:46:26 UTC

[incubator-druid] branch master updated: Fix memory problem (OOM/FGC) when expression is used in metricsSpec (#7716)

This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 42cf078  Fix memory problem (OOM/FGC) when expression is used in metricsSpec (#7716)
42cf078 is described below

commit 42cf07884345fa5fba70a17156678dc6c37ab3d1
Author: BIGrey <hu...@163.com>
AuthorDate: Tue May 28 00:46:17 2019 +0800

    Fix memory problem (OOM/FGC) when expression is used in metricsSpec (#7716)
    
    * AggregatorUtil should cache parsed expression to avoid memory problem (OOM/FGC) when Expression is used in metricsSpec
    
    * remove debug log check in Parser.parse
    
    * remove cache and use suppliers.memorize
---
 .../druid/benchmark/datagen/BenchmarkSchemas.java  | 18 +++++
 .../druid/query/aggregation/AggregatorUtil.java    | 20 ++---
 .../aggregation/SimpleDoubleAggregatorFactory.java | 10 ++-
 .../aggregation/SimpleFloatAggregatorFactory.java  | 10 ++-
 .../aggregation/SimpleLongAggregatorFactory.java   | 10 ++-
 .../incremental/OnheapIncrementalIndexTest.java    | 94 ++++++++++++++++++++++
 6 files changed, 139 insertions(+), 23 deletions(-)

diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
index cda9f47..3e35a66 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
@@ -21,6 +21,7 @@ package org.apache.druid.benchmark.datagen;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
@@ -85,6 +86,14 @@ public class BenchmarkSchemas
     basicSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
     basicSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
 
+    List<AggregatorFactory> basicSchemaIngestAggsExpression = new ArrayList<>();
+    basicSchemaIngestAggsExpression.add(new CountAggregatorFactory("rows"));
+    basicSchemaIngestAggsExpression.add(new LongSumAggregatorFactory("sumLongSequential", null, "if(sumLongSequential>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumLongSequential,0)", ExprMacroTable.nil()));
+    basicSchemaIngestAggsExpression.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
+    basicSchemaIngestAggsExpression.add(new DoubleSumAggregatorFactory("sumFloatNormal", null, "if(sumFloatNormal>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumFloatNormal,0)", ExprMacroTable.nil()));
+    basicSchemaIngestAggsExpression.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
+    basicSchemaIngestAggsExpression.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
+
     Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
 
     BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
@@ -93,7 +102,16 @@ public class BenchmarkSchemas
         basicSchemaDataInterval,
         true
     );
+
+    BenchmarkSchemaInfo basicSchemaExpression = new BenchmarkSchemaInfo(
+        basicSchemaColumns,
+        basicSchemaIngestAggsExpression,
+        basicSchemaDataInterval,
+        true
+    );
+
     SCHEMA_MAP.put("basic", basicSchema);
+    SCHEMA_MAP.put("expression", basicSchemaExpression);
   }
 
   static { // simple single string column and count agg schema, no rollup
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index bf1edd2..e5e5f51 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -24,8 +24,6 @@ import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.math.expr.Parser;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
@@ -184,9 +182,8 @@ public class AggregatorUtil
    */
   static BaseFloatColumnValueSelector makeColumnValueSelectorWithFloatDefault(
       final ColumnSelectorFactory metricFactory,
-      final ExprMacroTable macroTable,
       @Nullable final String fieldName,
-      @Nullable final String fieldExpression,
+      @Nullable final Expr fieldExpression,
       final float nullValue
   )
   {
@@ -196,8 +193,7 @@ public class AggregatorUtil
     if (fieldName != null) {
       return metricFactory.makeColumnValueSelector(fieldName);
     } else {
-      final Expr expr = Parser.parse(fieldExpression, macroTable);
-      final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr);
+      final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
       class ExpressionFloatColumnSelector implements FloatColumnSelector
       {
         @Override
@@ -231,9 +227,8 @@ public class AggregatorUtil
    */
   static BaseLongColumnValueSelector makeColumnValueSelectorWithLongDefault(
       final ColumnSelectorFactory metricFactory,
-      final ExprMacroTable macroTable,
       @Nullable final String fieldName,
-      @Nullable final String fieldExpression,
+      @Nullable final Expr fieldExpression,
       final long nullValue
   )
   {
@@ -243,8 +238,7 @@ public class AggregatorUtil
     if (fieldName != null) {
       return metricFactory.makeColumnValueSelector(fieldName);
     } else {
-      final Expr expr = Parser.parse(fieldExpression, macroTable);
-      final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr);
+      final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
       class ExpressionLongColumnSelector implements LongColumnSelector
       {
         @Override
@@ -276,9 +270,8 @@ public class AggregatorUtil
    */
   static BaseDoubleColumnValueSelector makeColumnValueSelectorWithDoubleDefault(
       final ColumnSelectorFactory metricFactory,
-      final ExprMacroTable macroTable,
       @Nullable final String fieldName,
-      @Nullable final String fieldExpression,
+      @Nullable final Expr fieldExpression,
       final double nullValue
   )
   {
@@ -288,8 +281,7 @@ public class AggregatorUtil
     if (fieldName != null) {
       return metricFactory.makeColumnValueSelector(fieldName);
     } else {
-      final Expr expr = Parser.parse(fieldExpression, macroTable);
-      final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr);
+      final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
       class ExpressionDoubleColumnSelector implements DoubleColumnSelector
       {
         @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
index 9dc8a2d..bb3cfa7 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
@@ -43,6 +46,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
   protected final String expression;
   protected final ExprMacroTable macroTable;
   protected final boolean storeDoubleAsFloat;
+  protected final Supplier<Expr> fieldExpression;
 
   public SimpleDoubleAggregatorFactory(
       ExprMacroTable macroTable,
@@ -56,6 +60,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
     this.fieldName = fieldName;
     this.expression = expression;
     this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
+    this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable));
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
     Preconditions.checkArgument(
         fieldName == null ^ expression == null,
@@ -67,9 +72,8 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
   {
     return AggregatorUtil.makeColumnValueSelectorWithDoubleDefault(
         metricFactory,
-        macroTable,
         fieldName,
-        expression,
+        fieldExpression.get(),
         nullValue
     );
   }
@@ -117,7 +121,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa
   {
     return fieldName != null
            ? Collections.singletonList(fieldName)
-           : Parser.findRequiredBindings(Parser.parse(expression, macroTable));
+           : Parser.findRequiredBindings(fieldExpression.get());
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java
index c535088..6b43113 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
@@ -41,6 +44,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
   @Nullable
   protected final String expression;
   protected final ExprMacroTable macroTable;
+  protected final Supplier<Expr> fieldExpression;
 
   public SimpleFloatAggregatorFactory(
       ExprMacroTable macroTable,
@@ -53,6 +57,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
     this.name = name;
     this.fieldName = fieldName;
     this.expression = expression;
+    this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable));
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
     Preconditions.checkArgument(
         fieldName == null ^ expression == null,
@@ -64,9 +69,8 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
   {
     return AggregatorUtil.makeColumnValueSelectorWithFloatDefault(
         metricFactory,
-        macroTable,
         fieldName,
-        expression,
+        fieldExpression.get(),
         nullValue
     );
   }
@@ -111,7 +115,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac
   {
     return fieldName != null
            ? Collections.singletonList(fieldName)
-           : Parser.findRequiredBindings(Parser.parse(expression, macroTable));
+           : Parser.findRequiredBindings(fieldExpression.get());
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java
index ec7e222..f53a57d 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
@@ -41,6 +44,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
   @Nullable
   protected final String expression;
   protected final ExprMacroTable macroTable;
+  protected final Supplier<Expr> fieldExpression;
 
   public SimpleLongAggregatorFactory(
       ExprMacroTable macroTable,
@@ -53,6 +57,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
     this.name = name;
     this.fieldName = fieldName;
     this.expression = expression;
+    this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable));
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
     Preconditions.checkArgument(
         fieldName == null ^ expression == null,
@@ -64,9 +69,8 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
   {
     return AggregatorUtil.makeColumnValueSelectorWithLongDefault(
         metricFactory,
-        macroTable,
         fieldName,
-        expression,
+        fieldExpression.get(),
         nullValue
     );
   }
@@ -107,7 +111,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact
   {
     return fieldName != null
            ? Collections.singletonList(fieldName)
-           : Parser.findRequiredBindings(Parser.parse(expression, macroTable));
+           : Parser.findRequiredBindings(fieldExpression.get());
   }
 
   @Override
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
index 76ce9d7..c5977a0 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
@@ -19,12 +19,17 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.js.JavaScriptConfig;
 import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregator;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -104,6 +109,95 @@ public class OnheapIncrementalIndexTest
   }
 
   @Test
+  public void testMultithreadAddFactsUsingExpressionAndJavaScript() throws Exception
+  {
+    final IncrementalIndex indexExpr = new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withQueryGranularity(Granularities.MINUTE)
+                .withMetrics(new LongSumAggregatorFactory(
+                    "oddnum",
+                    null,
+                    "if(value%2==1,1,0)",
+                    TestExprMacroTable.INSTANCE
+                ))
+                .withRollup(true)
+                .build()
+        )
+        .setMaxRowCount(MAX_ROWS)
+        .buildOnheap();
+
+    final IncrementalIndex indexJs = new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withQueryGranularity(Granularities.MINUTE)
+                .withMetrics(new JavaScriptAggregatorFactory(
+                    "oddnum",
+                    ImmutableList.of("value"),
+                    "function(current, value) { if (value%2==1) current = current + 1; return current;}",
+                    "function() {return 0;}",
+                    "function(a, b) { return a + b;}",
+                    JavaScriptConfig.getEnabledInstance()
+                ))
+                .withRollup(true)
+                .build()
+        )
+        .setMaxRowCount(MAX_ROWS)
+        .buildOnheap();
+
+    final int addThreadCount = 2;
+    Thread[] addThreads = new Thread[addThreadCount];
+    for (int i = 0; i < addThreadCount; ++i) {
+      addThreads[i] = new Thread(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          final Random random = ThreadLocalRandom.current();
+          try {
+            for (int j = 0; j < MAX_ROWS / addThreadCount; ++j) {
+              int randomInt = random.nextInt(100000);
+              MapBasedInputRow mapBasedInputRowExpr = new MapBasedInputRow(
+                  0,
+                  Collections.singletonList("billy"),
+                  ImmutableMap.of("billy", randomInt % 3, "value", randomInt)
+              );
+              MapBasedInputRow mapBasedInputRowJs = new MapBasedInputRow(
+                  0,
+                  Collections.singletonList("billy"),
+                  ImmutableMap.of("billy", randomInt % 3, "value", randomInt)
+              );
+              indexExpr.add(mapBasedInputRowExpr);
+              indexJs.add(mapBasedInputRowJs);
+            }
+          }
+          catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+      addThreads[i].start();
+    }
+
+    for (int i = 0; i < addThreadCount; ++i) {
+      addThreads[i].join();
+    }
+
+    long exprSum = 0;
+    long jsSum = 0;
+
+    for (IncrementalIndexRow row : indexExpr.getFacts().keySet()) {
+      exprSum += indexExpr.getMetricLongValue(row.getRowIndex(), 0);
+    }
+
+    for (IncrementalIndexRow row : indexJs.getFacts().keySet()) {
+      jsSum += indexJs.getMetricLongValue(row.getRowIndex(), 0);
+    }
+
+    Assert.assertEquals(exprSum, jsSum);
+  }
+
+  @Test
   public void testOnHeapIncrementalIndexClose() throws Exception
   {
     // Prepare the mocks & set close() call count expectation to 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org