You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/26 23:15:11 UTC

[GitHub] fjy closed pull request #6229: [Backport] SQL: Finalize aggregations for inner queries when necessary.

fjy closed pull request #6229: [Backport] SQL: Finalize aggregations for inner queries when necessary.
URL: https://github.com/apache/incubator-druid/pull/6229
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
index b9c8d3d3d46..28c0d65b10d 100644
--- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
+++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
@@ -48,6 +48,7 @@
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -62,6 +63,7 @@ public SqlAggFunction calciteFunction()
     return FUNCTION_INSTANCE;
   }
 
+  @Nullable
   @Override
   public Aggregation toDruidAggregation(
       final PlannerContext plannerContext,
@@ -70,7 +72,8 @@ public Aggregation toDruidAggregation(
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     final DruidExpression input = Expressions.toDruidExpression(
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
index d5da02d37b7..abc697c59e7 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
@@ -20,13 +20,13 @@
 package io.druid.sql.calcite.aggregation;
 
 import com.google.common.collect.ImmutableList;
-import io.druid.java.util.common.StringUtils;
 import io.druid.math.expr.ExprMacroTable;
 import io.druid.query.dimension.DefaultDimensionSpec;
 import io.druid.query.dimension.DimensionSpec;
 import io.druid.segment.VirtualColumn;
 import io.druid.segment.column.ValueType;
 import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -85,7 +85,7 @@ public DimensionSpec toDimensionSpec()
   @Nullable
   public String getVirtualColumnName()
   {
-    return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v", outputName);
+    return expression.isSimpleExtraction() ? null : Calcites.makePrefixedName(outputName, "v");
   }
 
   @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
index e6983ffb87a..dcf2c4e8d20 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
@@ -53,6 +53,9 @@
    * @param project              project that should be applied before aggregation; may be null
    * @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
    *                             ignored if you do not want to re-use existing aggregations.
+   * @param finalizeAggregations true if this query should include explicit finalization for all of its
+   *                             aggregators, where required. This is set for subqueries where Druid's native query
+   *                             layer does not do this automatically.
    *
    * @return aggregation, or null if the call cannot be translated
    */
@@ -64,6 +67,7 @@ Aggregation toDruidAggregation(
       String name,
       AggregateCall aggregateCall,
       Project project,
-      List<Aggregation> existingAggregations
+      List<Aggregation> existingAggregations,
+      boolean finalizeAggregations
   );
 }
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
index 161c3ef9c31..0abdb8a7206 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
@@ -22,9 +22,9 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.StringUtils;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import io.druid.query.dimension.DefaultDimensionSpec;
 import io.druid.query.dimension.DimensionSpec;
@@ -52,6 +52,7 @@
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class ApproxCountDistinctSqlAggregator implements SqlAggregator
@@ -74,7 +75,8 @@ public Aggregation toDruidAggregation(
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
@@ -92,14 +94,15 @@ public Aggregation toDruidAggregation(
 
     final List<VirtualColumn> virtualColumns = new ArrayList<>();
     final AggregatorFactory aggregatorFactory;
+    final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
 
     if (arg.isDirectColumnAccess() && rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) {
-      aggregatorFactory = new HyperUniquesAggregatorFactory(name, arg.getDirectColumn(), false, true);
+      aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName, arg.getDirectColumn(), false, true);
     } else {
       final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
       final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
       if (inputType == null) {
-        throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, name);
+        throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
       }
 
       final DimensionSpec dimensionSpec;
@@ -108,7 +111,7 @@ public Aggregation toDruidAggregation(
         dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType);
       } else {
         final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn(
-            StringUtils.format("%s:v", name),
+            Calcites.makePrefixedName(name, "v"),
             inputType,
             plannerContext.getExprMacroTable()
         );
@@ -116,10 +119,20 @@ public Aggregation toDruidAggregation(
         virtualColumns.add(virtualColumn);
       }
 
-      aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true);
+      aggregatorFactory = new CardinalityAggregatorFactory(
+          aggregatorName,
+          null,
+          ImmutableList.of(dimensionSpec),
+          false,
+          true
+      );
     }
 
-    return Aggregation.create(virtualColumns, aggregatorFactory);
+    return Aggregation.create(
+        virtualColumns,
+        Collections.singletonList(aggregatorFactory),
+        finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name, aggregatorFactory.getName()) : null
+    );
   }
 
   private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
index 9948c96f4b3..32de829cf4d 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
@@ -21,7 +21,6 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import io.druid.java.util.common.StringUtils;
 import io.druid.math.expr.ExprMacroTable;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.CountAggregatorFactory;
@@ -32,6 +31,7 @@
 import io.druid.sql.calcite.aggregation.Aggregations;
 import io.druid.sql.calcite.aggregation.SqlAggregator;
 import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
 import io.druid.sql.calcite.planner.PlannerContext;
 import io.druid.sql.calcite.table.RowSignature;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -61,7 +61,8 @@ public Aggregation toDruidAggregation(
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
@@ -102,8 +103,8 @@ public Aggregation toDruidAggregation(
       expression = arg.getExpression();
     }
 
-    final String sumName = StringUtils.format("%s:sum", name);
-    final String countName = StringUtils.format("%s:count", name);
+    final String sumName = Calcites.makePrefixedName(name, "sum");
+    final String countName = Calcites.makePrefixedName(name, "count");
     final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory(
         sumType,
         sumName,
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
index f8a359bf1c4..d82e581977d 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     final List<DruidExpression> args = Aggregations.getArgumentsForSimpleAggregator(
@@ -87,7 +88,8 @@ public Aggregation toDruidAggregation(
             name,
             aggregateCall,
             project,
-            existingAggregations
+            existingAggregations,
+            finalizeAggregations
         );
       } else {
         return null;
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
index 8be9bdf5538..6072c114dc5 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
index f23f2d6c3b7..fb6d00983b4 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
index bea1f254f9c..6b7a9ba0353 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
index ac566a68605..bf39def562b 100644
--- a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
@@ -297,21 +297,26 @@ public static boolean isIntLiteral(final RexNode rexNode)
     return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
   }
 
-  public static String findOutputNamePrefix(final String basePrefix, final NavigableSet<String> strings)
+  public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
   {
     String prefix = basePrefix;
 
-    while (!isUsablePrefix(strings, prefix)) {
+    while (!isUnusedPrefix(prefix, strings)) {
       prefix = "_" + prefix;
     }
 
     return prefix;
   }
 
-  private static boolean isUsablePrefix(final NavigableSet<String> strings, final String prefix)
+  private static boolean isUnusedPrefix(final String prefix, final NavigableSet<String> strings)
   {
     // ":" is one character after "9"
     final NavigableSet<String> subSet = strings.subSet(prefix + "0", true, prefix + ":", false);
     return subSet.isEmpty();
   }
+
+  public static String makePrefixedName(final String prefix, final String suffix)
+  {
+    return StringUtils.format("%s:%s", prefix, suffix);
+  }
 }
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
index b54cf2d0dd2..56e80c98fba 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
@@ -88,7 +88,11 @@ public PartialDruidQuery getPartialDruidQuery()
   @Override
   public Sequence<Object[]> runQuery()
   {
-    final DruidQuery query = toDruidQuery();
+    // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+    // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+    // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+    final DruidQuery query = toDruidQuery(false);
     if (query != null) {
       return getQueryMaker().runQuery(query);
     } else {
@@ -116,9 +120,11 @@ public int getQueryCount()
 
   @Nullable
   @Override
-  public DruidQuery toDruidQuery()
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
   {
-    final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery();
+    // Must finalize aggregations on subqueries.
+
+    final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true);
     if (subQuery == null) {
       return null;
     }
@@ -128,7 +134,8 @@ public DruidQuery toDruidQuery()
         new QueryDataSource(subQuery.toGroupByQuery()),
         sourceRowSignature,
         getPlannerContext(),
-        getCluster().getRexBuilder()
+        getCluster().getRexBuilder(),
+        finalizeAggregations
     );
   }
 
@@ -142,7 +149,8 @@ public DruidQuery toDruidQueryForExplaining()
             sourceRel.getRowType()
         ),
         getPlannerContext(),
-        getCluster().getRexBuilder()
+        getCluster().getRexBuilder(),
+        false
     );
   }
 
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
index bca4481992f..9740f681551 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
@@ -115,7 +115,8 @@ public DruidQuery(
       final DataSource dataSource,
       final RowSignature sourceRowSignature,
       final PlannerContext plannerContext,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
     this.dataSource = dataSource;
@@ -126,7 +127,7 @@ public DruidQuery(
     // Now the fun begins.
     this.filter = computeWhereFilter(partialQuery, sourceRowSignature, plannerContext);
     this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceRowSignature);
-    this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder);
+    this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder, finalizeAggregations);
 
     if (this.selectProjection != null) {
       this.outputRowSignature = this.selectProjection.getOutputRowSignature();
@@ -199,7 +200,7 @@ private static SelectProjection computeSelectProjection(
     final List<VirtualColumn> virtualColumns = new ArrayList<>();
     final List<String> rowOrder = new ArrayList<>();
 
-    final String virtualColumnPrefix = Calcites.findOutputNamePrefix(
+    final String virtualColumnPrefix = Calcites.findUnusedPrefix(
         "v",
         new TreeSet<>(sourceRowSignature.getRowOrder())
     );
@@ -231,7 +232,8 @@ private static Grouping computeGrouping(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
       final RowSignature sourceRowSignature,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
     final Aggregate aggregate = partialQuery.getAggregate();
@@ -246,7 +248,8 @@ private static Grouping computeGrouping(
         partialQuery,
         plannerContext,
         sourceRowSignature,
-        rexBuilder
+        rexBuilder,
+        finalizeAggregations
     );
 
     final RowSignature aggregateRowSignature = RowSignature.from(
@@ -340,7 +343,7 @@ private static Grouping computeGrouping(
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<DimensionExpression> dimensions = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findOutputNamePrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
+    final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
     int outputNameCounter = 0;
 
     for (int i : aggregate.getGroupSet()) {
@@ -372,10 +375,13 @@ private static Grouping computeGrouping(
   /**
    * Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order.
    *
-   * @param partialQuery       partial query
-   * @param plannerContext     planner context
-   * @param sourceRowSignature source row signature
-   * @param rexBuilder         calcite RexBuilder
+   * @param partialQuery         partial query
+   * @param plannerContext       planner context
+   * @param sourceRowSignature   source row signature
+   * @param rexBuilder           calcite RexBuilder
+   * @param finalizeAggregations true if this query should include explicit finalization for all of its
+   *                             aggregators, where required. Useful for subqueries where Druid's native query layer
+   *                             does not do this automatically.
    *
    * @return aggregations
    *
@@ -385,12 +391,13 @@ private static Grouping computeGrouping(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
       final RowSignature sourceRowSignature,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<Aggregation> aggregations = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findOutputNamePrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
+    final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
 
     for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
       final String aggName = outputNamePrefix + i;
@@ -402,7 +409,8 @@ private static Grouping computeGrouping(
           partialQuery.getSelectProject(),
           aggCall,
           aggregations,
-          aggName
+          aggName,
+          finalizeAggregations
       );
 
       if (aggregation == null) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
index 5d0ea438106..c304e5babd7 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
@@ -93,20 +93,21 @@ public static DruidQueryRel fullScan(
 
   @Override
   @Nonnull
-  public DruidQuery toDruidQuery()
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
   {
     return partialQuery.build(
         druidTable.getDataSource(),
         druidTable.getRowSignature(),
         getPlannerContext(),
-        getCluster().getRexBuilder()
+        getCluster().getRexBuilder(),
+        finalizeAggregations
     );
   }
 
   @Override
   public DruidQuery toDruidQueryForExplaining()
   {
-    return toDruidQuery();
+    return toDruidQuery(false);
   }
 
   @Override
@@ -169,7 +170,11 @@ public int getQueryCount()
   @Override
   public Sequence<Object[]> runQuery()
   {
-    return getQueryMaker().runQuery(toDruidQuery());
+    // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+    // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+    // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+    return getQueryMaker().runQuery(toDruidQuery(false));
   }
 
   @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
index 32ff1206f4d..9244b504b27 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
@@ -76,12 +76,16 @@ public boolean isValidDruidQuery()
    *
    * This method may return null if it knows that this rel will yield an empty result set.
    *
+   * @param finalizeAggregations true if this query should include explicit finalization for all of its
+   *                             aggregators, where required. Useful for subqueries where Druid's native query layer
+   *                             does not do this automatically.
+   *
    * @return query, or null if it is known in advance that this rel will yield an empty result set.
    *
    * @throws CannotBuildQueryException
    */
   @Nullable
-  public abstract DruidQuery toDruidQuery();
+  public abstract DruidQuery toDruidQuery(boolean finalizeAggregations);
 
   /**
    * Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation. For
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
index 5d6abfc1f01..ecfd8bbb2b6 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
@@ -142,10 +142,10 @@ public DruidSemiJoin withPartialQuery(final PartialDruidQuery newQueryBuilder)
 
   @Nullable
   @Override
-  public DruidQuery toDruidQuery()
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
   {
     final DruidRel rel = getLeftRelWithFilter();
-    return rel != null ? rel.toDruidQuery() : null;
+    return rel != null ? rel.toDruidQuery(finalizeAggregations) : null;
   }
 
   @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
index 263c2e09a1c..01c960c918f 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -246,10 +246,11 @@ public DruidQuery build(
       final DataSource dataSource,
       final RowSignature sourceRowSignature,
       final PlannerContext plannerContext,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
-    return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder);
+    return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations);
   }
 
   public boolean canAccept(final Stage stage)
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
index 5ec46abfd01..b565aba995a 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
@@ -101,7 +101,7 @@ public DruidQueryRule(
     {
       super(
           operand(relClass, operand(DruidRel.class, any())),
-          StringUtils.format("%s:%s", DruidQueryRule.class.getSimpleName(), stage)
+          StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage)
       );
       this.stage = stage;
       this.f = f;
@@ -229,7 +229,7 @@ public void onMatch(final RelOptRuleCall call)
 
     public DruidOuterQueryRule(final RelOptRuleOperand op, final String description)
     {
-      super(op, StringUtils.format("%s:%s", DruidOuterQueryRel.class.getSimpleName(), description));
+      super(op, StringUtils.format("%s(%s)", DruidOuterQueryRel.class.getSimpleName(), description));
     }
 
     @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
index 13089816a4a..e3b52b49703 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
@@ -57,7 +57,8 @@ public static Aggregation translateAggregateCall(
       final Project project,
       final AggregateCall call,
       final List<Aggregation> existingAggregations,
-      final String name
+      final String name,
+      final boolean finalizeAggregations
   )
   {
     final DimFilter filter;
@@ -125,7 +126,8 @@ public static Aggregation translateAggregateCall(
         name,
         call,
         project,
-        existingAggregationsWithSameFilter
+        existingAggregationsWithSameFilter,
+        finalizeAggregations
     );
 
     if (retVal == null) {
diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
index 19722de6ee0..5bfa8a3bfab 100644
--- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
@@ -48,6 +48,7 @@
 import io.druid.query.aggregation.LongMinAggregatorFactory;
 import io.druid.query.aggregation.LongSumAggregatorFactory;
 import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import io.druid.query.aggregation.post.ArithmeticPostAggregator;
 import io.druid.query.aggregation.post.ExpressionPostAggregator;
@@ -4032,6 +4033,74 @@ public void testExactCountDistinctUsingSubquery() throws Exception
     );
   }
 
+  @Test
+  public void testAvgDailyCountDistinct() throws Exception
+  {
+    testQuery(
+        "SELECT\n"
+        + "  AVG(u)\n"
+        + "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT(cnt) AS u FROM druid.foo GROUP BY 1)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            new QueryDataSource(
+                                GroupByQuery.builder()
+                                            .setDataSource(CalciteTests.DATASOURCE1)
+                                            .setInterval(QSS(Filtration.eternity()))
+                                            .setGranularity(Granularities.ALL)
+                                            .setVirtualColumns(
+                                                EXPRESSION_VIRTUAL_COLUMN(
+                                                    "d0:v",
+                                                    "timestamp_floor(\"__time\",'P1D','','UTC')",
+                                                    ValueType.LONG
+                                                )
+                                            )
+                                            .setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG)))
+                                            .setAggregatorSpecs(
+                                                AGGS(
+                                                    new CardinalityAggregatorFactory(
+                                                        "a0:a",
+                                                        null,
+                                                        DIMS(new DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)),
+                                                        false,
+                                                        true
+                                                    )
+                                                )
+                                            )
+                                            .setPostAggregatorSpecs(
+                                                ImmutableList.of(
+                                                    new HyperUniqueFinalizingPostAggregator("a0", "a0:a")
+                                                )
+                                            )
+                                            .setContext(QUERY_CONTEXT_DEFAULT)
+                                            .build()
+                            )
+                        )
+                        .setInterval(QSS(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(AGGS(
+                            new LongSumAggregatorFactory("_a0:sum", "a0"),
+                            new CountAggregatorFactory("_a0:count")
+                        ))
+                        .setPostAggregatorSpecs(
+                            ImmutableList.of(
+                                new ArithmeticPostAggregator(
+                                    "_a0",
+                                    "quotient",
+                                    ImmutableList.of(
+                                        new FieldAccessPostAggregator(null, "_a0:sum"),
+                                        new FieldAccessPostAggregator(null, "_a0:count")
+                                    )
+                                )
+                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(new Object[]{1L})
+    );
+  }
+
   @Test
   public void testTopNFilterJoin() throws Exception
   {
diff --git a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
index 937ba7f8109..ae0cbf2eca0 100644
--- a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
@@ -41,16 +41,16 @@ public void testEscapeStringLiteral()
   }
 
   @Test
-  public void testFindOutputNamePrefix()
+  public void testFindUnusedPrefix()
   {
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
-    Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
-    Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
-    Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
-    Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
+    Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
+    Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
+    Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
+    Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org