You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/08/25 20:56:30 UTC

[incubator-druid] branch master updated: SQL: Finalize aggregations for inner queries when necessary. (#6221)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 28e6ae3  SQL: Finalize aggregations for inner queries when necessary. (#6221)
28e6ae3 is described below

commit 28e6ae3664b02b17ee691968469905f8df5a8e28
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Sat Aug 25 13:56:23 2018 -0700

    SQL: Finalize aggregations for inner queries when necessary. (#6221)
    
    * SQL: Finalize aggregations for inner queries when necessary.
    
    Fixes #5779.
    
    * Fixed test method name.
---
 .../histogram/sql/QuantileSqlAggregator.java       |  5 +-
 .../calcite/aggregation/DimensionExpression.java   |  4 +-
 .../sql/calcite/aggregation/SqlAggregator.java     |  6 +-
 .../builtin/ApproxCountDistinctSqlAggregator.java  | 27 +++++--
 .../aggregation/builtin/AvgSqlAggregator.java      |  9 ++-
 .../aggregation/builtin/CountSqlAggregator.java    |  6 +-
 .../aggregation/builtin/MaxSqlAggregator.java      |  3 +-
 .../aggregation/builtin/MinSqlAggregator.java      |  3 +-
 .../aggregation/builtin/SumSqlAggregator.java      |  3 +-
 .../io/druid/sql/calcite/planner/Calcites.java     | 11 ++-
 .../druid/sql/calcite/rel/DruidOuterQueryRel.java  | 18 +++--
 .../java/io/druid/sql/calcite/rel/DruidQuery.java  | 34 +++++----
 .../io/druid/sql/calcite/rel/DruidQueryRel.java    | 13 +++-
 .../java/io/druid/sql/calcite/rel/DruidRel.java    |  6 +-
 .../io/druid/sql/calcite/rel/DruidSemiJoin.java    |  4 +-
 .../druid/sql/calcite/rel/PartialDruidQuery.java   |  5 +-
 .../java/io/druid/sql/calcite/rule/DruidRules.java |  4 +-
 .../io/druid/sql/calcite/rule/GroupByRules.java    |  6 +-
 .../io/druid/sql/calcite/CalciteQueryTest.java     | 86 ++++++++++++++++++++--
 .../io/druid/sql/calcite/planner/CalcitesTest.java | 20 ++---
 20 files changed, 204 insertions(+), 69 deletions(-)

diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
index 830771d..af6473b 100644
--- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
+++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
@@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -62,6 +63,7 @@ public class QuantileSqlAggregator implements SqlAggregator
     return FUNCTION_INSTANCE;
   }
 
+  @Nullable
   @Override
   public Aggregation toDruidAggregation(
       final PlannerContext plannerContext,
@@ -70,7 +72,8 @@ public class QuantileSqlAggregator implements SqlAggregator
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     final DruidExpression input = Expressions.toDruidExpression(
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
index d2573e8..6d4ef48 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
@@ -20,13 +20,13 @@
 package io.druid.sql.calcite.aggregation;
 
 import com.google.common.collect.ImmutableList;
-import io.druid.java.util.common.StringUtils;
 import io.druid.math.expr.ExprMacroTable;
 import io.druid.query.dimension.DefaultDimensionSpec;
 import io.druid.query.dimension.DimensionSpec;
 import io.druid.segment.VirtualColumn;
 import io.druid.segment.column.ValueType;
 import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -80,7 +80,7 @@ public class DimensionExpression
   @Nullable
   public String getVirtualColumnName()
   {
-    return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v", outputName);
+    return expression.isSimpleExtraction() ? null : Calcites.makePrefixedName(outputName, "v");
   }
 
   @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
index 83ac84a..4c8777d 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
@@ -53,6 +53,9 @@ public interface SqlAggregator
    * @param project              project that should be applied before aggregation; may be null
    * @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
    *                             ignored if you do not want to re-use existing aggregations.
+   * @param finalizeAggregations true if this query should include explicit finalization for all of its
+   *                             aggregators, where required. This is set for subqueries where Druid's native query
+   *                             layer does not do this automatically.
    *
    * @return aggregation, or null if the call cannot be translated
    */
@@ -64,6 +67,7 @@ public interface SqlAggregator
       String name,
       AggregateCall aggregateCall,
       Project project,
-      List<Aggregation> existingAggregations
+      List<Aggregation> existingAggregations,
+      boolean finalizeAggregations
   );
 }
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
index dfb1c37..208609a 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
@@ -22,9 +22,9 @@ package io.druid.sql.calcite.aggregation.builtin;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.StringUtils;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import io.druid.query.dimension.DefaultDimensionSpec;
 import io.druid.query.dimension.DimensionSpec;
@@ -52,6 +52,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class ApproxCountDistinctSqlAggregator implements SqlAggregator
@@ -74,7 +75,8 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
@@ -92,14 +94,15 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
 
     final List<VirtualColumn> virtualColumns = new ArrayList<>();
     final AggregatorFactory aggregatorFactory;
+    final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
 
     if (arg.isDirectColumnAccess() && rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) {
-      aggregatorFactory = new HyperUniquesAggregatorFactory(name, arg.getDirectColumn(), false, true);
+      aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName, arg.getDirectColumn(), false, true);
     } else {
       final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
       final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
       if (inputType == null) {
-        throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, name);
+        throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
       }
 
       final DimensionSpec dimensionSpec;
@@ -108,7 +111,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
         dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType);
       } else {
         final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn(
-            StringUtils.format("%s:v", name),
+            Calcites.makePrefixedName(name, "v"),
             inputType,
             plannerContext.getExprMacroTable()
         );
@@ -116,10 +119,20 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
         virtualColumns.add(virtualColumn);
       }
 
-      aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true);
+      aggregatorFactory = new CardinalityAggregatorFactory(
+          aggregatorName,
+          null,
+          ImmutableList.of(dimensionSpec),
+          false,
+          true
+      );
     }
 
-    return Aggregation.create(virtualColumns, aggregatorFactory);
+    return Aggregation.create(
+        virtualColumns,
+        Collections.singletonList(aggregatorFactory),
+        finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name, aggregatorFactory.getName()) : null
+    );
   }
 
   private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
index e490f0f..5d695bb 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
@@ -21,7 +21,6 @@ package io.druid.sql.calcite.aggregation.builtin;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import io.druid.java.util.common.StringUtils;
 import io.druid.math.expr.ExprMacroTable;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.CountAggregatorFactory;
@@ -32,6 +31,7 @@ import io.druid.sql.calcite.aggregation.Aggregation;
 import io.druid.sql.calcite.aggregation.Aggregations;
 import io.druid.sql.calcite.aggregation.SqlAggregator;
 import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
 import io.druid.sql.calcite.planner.PlannerContext;
 import io.druid.sql.calcite.table.RowSignature;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -61,7 +61,8 @@ public class AvgSqlAggregator implements SqlAggregator
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
@@ -102,8 +103,8 @@ public class AvgSqlAggregator implements SqlAggregator
       expression = arg.getExpression();
     }
 
-    final String sumName = StringUtils.format("%s:sum", name);
-    final String countName = StringUtils.format("%s:count", name);
+    final String sumName = Calcites.makePrefixedName(name, "sum");
+    final String countName = Calcites.makePrefixedName(name, "count");
     final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory(
         sumType,
         sumName,
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
index 61aac89..8b47037 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
@@ -60,7 +60,8 @@ public class CountSqlAggregator implements SqlAggregator
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     final List<DruidExpression> args = Aggregations.getArgumentsForSimpleAggregator(
@@ -87,7 +88,8 @@ public class CountSqlAggregator implements SqlAggregator
             name,
             aggregateCall,
             project,
-            existingAggregations
+            existingAggregations,
+            finalizeAggregations
         );
       } else {
         return null;
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
index 9dff97d..5e96057 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
@@ -60,7 +60,8 @@ public class MaxSqlAggregator implements SqlAggregator
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
index 0950cad..81a26d3 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
@@ -60,7 +60,8 @@ public class MinSqlAggregator implements SqlAggregator
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
index f807652..61a36c8 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
@@ -60,7 +60,8 @@ public class SumSqlAggregator implements SqlAggregator
       final String name,
       final AggregateCall aggregateCall,
       final Project project,
-      final List<Aggregation> existingAggregations
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
   )
   {
     if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
index e59c554..df401b3 100644
--- a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
@@ -339,21 +339,26 @@ public class Calcites
     return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
   }
 
-  public static String findOutputNamePrefix(final String basePrefix, final NavigableSet<String> strings)
+  public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
   {
     String prefix = basePrefix;
 
-    while (!isUsablePrefix(strings, prefix)) {
+    while (!isUnusedPrefix(prefix, strings)) {
       prefix = "_" + prefix;
     }
 
     return prefix;
   }
 
-  private static boolean isUsablePrefix(final NavigableSet<String> strings, final String prefix)
+  private static boolean isUnusedPrefix(final String prefix, final NavigableSet<String> strings)
   {
     // ":" is one character after "9"
     final NavigableSet<String> subSet = strings.subSet(prefix + "0", true, prefix + ":", false);
     return subSet.isEmpty();
   }
+
+  public static String makePrefixedName(final String prefix, final String suffix)
+  {
+    return StringUtils.format("%s:%s", prefix, suffix);
+  }
 }
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
index 6caab8d..7d0666c 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
@@ -88,7 +88,11 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
   @Override
   public Sequence<Object[]> runQuery()
   {
-    final DruidQuery query = toDruidQuery();
+    // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+    // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+    // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+    final DruidQuery query = toDruidQuery(false);
     if (query != null) {
       return getQueryMaker().runQuery(query);
     } else {
@@ -116,9 +120,11 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
 
   @Nullable
   @Override
-  public DruidQuery toDruidQuery()
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
   {
-    final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery();
+    // Must finalize aggregations on subqueries.
+
+    final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true);
     if (subQuery == null) {
       return null;
     }
@@ -128,7 +134,8 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
         new QueryDataSource(subQuery.toGroupByQuery()),
         sourceRowSignature,
         getPlannerContext(),
-        getCluster().getRexBuilder()
+        getCluster().getRexBuilder(),
+        finalizeAggregations
     );
   }
 
@@ -142,7 +149,8 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
             sourceRel.getRowType()
         ),
         getPlannerContext(),
-        getCluster().getRexBuilder()
+        getCluster().getRexBuilder(),
+        false
     );
   }
 
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
index 2980126..e6d5ba5 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
@@ -131,7 +131,8 @@ public class DruidQuery
       final DataSource dataSource,
       final RowSignature sourceRowSignature,
       final PlannerContext plannerContext,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
     this.dataSource = dataSource;
@@ -142,7 +143,7 @@ public class DruidQuery
     // Now the fun begins.
     this.filter = computeWhereFilter(partialQuery, sourceRowSignature, plannerContext);
     this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceRowSignature);
-    this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder);
+    this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder, finalizeAggregations);
 
     final RowSignature sortingInputRowSignature;
 
@@ -222,7 +223,7 @@ public class DruidQuery
     final List<VirtualColumn> virtualColumns = new ArrayList<>();
     final List<String> rowOrder = new ArrayList<>();
 
-    final String virtualColumnPrefix = Calcites.findOutputNamePrefix(
+    final String virtualColumnPrefix = Calcites.findUnusedPrefix(
         "v",
         new TreeSet<>(sourceRowSignature.getRowOrder())
     );
@@ -254,7 +255,8 @@ public class DruidQuery
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
       final RowSignature sourceRowSignature,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
     final Aggregate aggregate = partialQuery.getAggregate();
@@ -269,7 +271,8 @@ public class DruidQuery
         partialQuery,
         plannerContext,
         sourceRowSignature,
-        rexBuilder
+        rexBuilder,
+        finalizeAggregations
     );
 
     final RowSignature aggregateRowSignature = RowSignature.from(
@@ -428,7 +431,7 @@ public class DruidQuery
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<DimensionExpression> dimensions = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findOutputNamePrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
+    final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
     int outputNameCounter = 0;
 
     for (int i : aggregate.getGroupSet()) {
@@ -460,10 +463,13 @@ public class DruidQuery
   /**
    * Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order.
    *
-   * @param partialQuery       partial query
-   * @param plannerContext     planner context
-   * @param sourceRowSignature source row signature
-   * @param rexBuilder         calcite RexBuilder
+   * @param partialQuery         partial query
+   * @param plannerContext       planner context
+   * @param sourceRowSignature   source row signature
+   * @param rexBuilder           calcite RexBuilder
+   * @param finalizeAggregations true if this query should include explicit finalization for all of its
+   *                             aggregators, where required. Useful for subqueries where Druid's native query layer
+   *                             does not do this automatically.
    *
    * @return aggregations
    *
@@ -473,12 +479,13 @@ public class DruidQuery
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
       final RowSignature sourceRowSignature,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<Aggregation> aggregations = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findOutputNamePrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
+    final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
 
     for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
       final String aggName = outputNamePrefix + i;
@@ -490,7 +497,8 @@ public class DruidQuery
           partialQuery.getSelectProject(),
           aggCall,
           aggregations,
-          aggName
+          aggName,
+          finalizeAggregations
       );
 
       if (aggregation == null) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
index 0b1088a..57dea9b 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
@@ -93,20 +93,21 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
 
   @Override
   @Nonnull
-  public DruidQuery toDruidQuery()
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
   {
     return partialQuery.build(
         druidTable.getDataSource(),
         druidTable.getRowSignature(),
         getPlannerContext(),
-        getCluster().getRexBuilder()
+        getCluster().getRexBuilder(),
+        finalizeAggregations
     );
   }
 
   @Override
   public DruidQuery toDruidQueryForExplaining()
   {
-    return toDruidQuery();
+    return toDruidQuery(false);
   }
 
   @Override
@@ -169,7 +170,11 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
   @Override
   public Sequence<Object[]> runQuery()
   {
-    return getQueryMaker().runQuery(toDruidQuery());
+    // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+    // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+    // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+    return getQueryMaker().runQuery(toDruidQuery(false));
   }
 
   @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
index 7739c9b..12053a7 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
@@ -76,12 +76,16 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode imple
    *
    * This method may return null if it knows that this rel will yield an empty result set.
    *
+   * @param finalizeAggregations true if this query should include explicit finalization for all of its
+   *                             aggregators, where required. Useful for subqueries where Druid's native query layer
+   *                             does not do this automatically.
+   *
    * @return query, or null if it is known in advance that this rel will yield an empty result set.
    *
    * @throws CannotBuildQueryException
    */
   @Nullable
-  public abstract DruidQuery toDruidQuery();
+  public abstract DruidQuery toDruidQuery(boolean finalizeAggregations);
 
   /**
    * Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation. For
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
index 65fe55f..f54fd51 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
@@ -143,10 +143,10 @@ public class DruidSemiJoin extends DruidRel<DruidSemiJoin>
 
   @Nullable
   @Override
-  public DruidQuery toDruidQuery()
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
   {
     final DruidRel rel = getLeftRelWithFilter();
-    return rel != null ? rel.toDruidQuery() : null;
+    return rel != null ? rel.toDruidQuery(finalizeAggregations) : null;
   }
 
   @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
index 9483fa2..5adfacf 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -278,10 +278,11 @@ public class PartialDruidQuery
       final DataSource dataSource,
       final RowSignature sourceRowSignature,
       final PlannerContext plannerContext,
-      final RexBuilder rexBuilder
+      final RexBuilder rexBuilder,
+      final boolean finalizeAggregations
   )
   {
-    return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder);
+    return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations);
   }
 
   public boolean canAccept(final Stage stage)
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
index 7c44c7b..642c886 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
@@ -107,7 +107,7 @@ public class DruidRules
     {
       super(
           operand(relClass, operand(DruidRel.class, any())),
-          StringUtils.format("%s:%s", DruidQueryRule.class.getSimpleName(), stage)
+          StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage)
       );
       this.stage = stage;
       this.f = f;
@@ -261,7 +261,7 @@ public class DruidRules
 
     public DruidOuterQueryRule(final RelOptRuleOperand op, final String description)
     {
-      super(op, StringUtils.format("%s:%s", DruidOuterQueryRel.class.getSimpleName(), description));
+      super(op, StringUtils.format("%s(%s)", DruidOuterQueryRel.class.getSimpleName(), description));
     }
 
     @Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
index 368bfac..c42fba6 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
@@ -57,7 +57,8 @@ public class GroupByRules
       final Project project,
       final AggregateCall call,
       final List<Aggregation> existingAggregations,
-      final String name
+      final String name,
+      final boolean finalizeAggregations
   )
   {
     final DimFilter filter;
@@ -125,7 +126,8 @@ public class GroupByRules
         name,
         call,
         project,
-        existingAggregationsWithSameFilter
+        existingAggregationsWithSameFilter,
+        finalizeAggregations
     );
 
     if (retVal == null) {
diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
index 7a70341..d8b0156 100644
--- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
@@ -50,6 +50,7 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory;
 import io.druid.query.aggregation.LongMinAggregatorFactory;
 import io.druid.query.aggregation.LongSumAggregatorFactory;
 import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import io.druid.query.aggregation.post.ArithmeticPostAggregator;
 import io.druid.query.aggregation.post.ExpressionPostAggregator;
@@ -2560,8 +2561,10 @@ public class CalciteQueryTest extends CalciteTestBase
                         .setInterval(QSS(Filtration.eternity()))
                         .setGranularity(Granularities.ALL)
                         .setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0")))
-                        .setAggregatorSpecs(new FloatMinAggregatorFactory("a0", "m1"),
-                                            new FloatMaxAggregatorFactory("a1", "m1"))
+                        .setAggregatorSpecs(
+                            new FloatMinAggregatorFactory("a0", "m1"),
+                            new FloatMaxAggregatorFactory("a1", "m1")
+                        )
                         .setPostAggregatorSpecs(ImmutableList.of(EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")")))
                         .setLimitSpec(
                             new DefaultLimitSpec(
@@ -2602,8 +2605,10 @@ public class CalciteQueryTest extends CalciteTestBase
                         .setInterval(QSS(Filtration.eternity()))
                         .setGranularity(Granularities.ALL)
                         .setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0")))
-                        .setAggregatorSpecs(new FloatMinAggregatorFactory("a0", "m1"),
-                                            new FloatMaxAggregatorFactory("a1", "m1"))
+                        .setAggregatorSpecs(
+                            new FloatMinAggregatorFactory("a0", "m1"),
+                            new FloatMaxAggregatorFactory("a1", "m1")
+                        )
                         .setPostAggregatorSpecs(
                             ImmutableList.of(
                                 EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")")
@@ -4385,6 +4390,74 @@ public class CalciteQueryTest extends CalciteTestBase
   }
 
   @Test
+  public void testAvgDailyCountDistinct() throws Exception
+  {
+    testQuery(
+        "SELECT\n"
+        + "  AVG(u)\n"
+        + "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT(cnt) AS u FROM druid.foo GROUP BY 1)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            new QueryDataSource(
+                                GroupByQuery.builder()
+                                            .setDataSource(CalciteTests.DATASOURCE1)
+                                            .setInterval(QSS(Filtration.eternity()))
+                                            .setGranularity(Granularities.ALL)
+                                            .setVirtualColumns(
+                                                EXPRESSION_VIRTUAL_COLUMN(
+                                                    "d0:v",
+                                                    "timestamp_floor(\"__time\",'P1D',null,'UTC')",
+                                                    ValueType.LONG
+                                                )
+                                            )
+                                            .setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG)))
+                                            .setAggregatorSpecs(
+                                                AGGS(
+                                                    new CardinalityAggregatorFactory(
+                                                        "a0:a",
+                                                        null,
+                                                        DIMS(new DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)),
+                                                        false,
+                                                        true
+                                                    )
+                                                )
+                                            )
+                                            .setPostAggregatorSpecs(
+                                                ImmutableList.of(
+                                                    new HyperUniqueFinalizingPostAggregator("a0", "a0:a")
+                                                )
+                                            )
+                                            .setContext(QUERY_CONTEXT_DEFAULT)
+                                            .build()
+                            )
+                        )
+                        .setInterval(QSS(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(AGGS(
+                            new LongSumAggregatorFactory("_a0:sum", "a0"),
+                            new CountAggregatorFactory("_a0:count")
+                        ))
+                        .setPostAggregatorSpecs(
+                            ImmutableList.of(
+                                new ArithmeticPostAggregator(
+                                    "_a0",
+                                    "quotient",
+                                    ImmutableList.of(
+                                        new FieldAccessPostAggregator(null, "_a0:sum"),
+                                        new FieldAccessPostAggregator(null, "_a0:count")
+                                    )
+                                )
+                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(new Object[]{1L})
+    );
+  }
+
+  @Test
   public void testTopNFilterJoin() throws Exception
   {
     DimFilter filter = NullHandling.replaceWithDefault() ?
@@ -6897,7 +6970,10 @@ public class CalciteQueryTest extends CalciteTestBase
                         .setAggregatorSpecs(
                             AGGS(new CountAggregatorFactory("a0"), new DoubleSumAggregatorFactory("a1", "m2"))
                         )
-                        .setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG("p0", "(\"a1\" / \"a0\")")))
+                        .setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG(
+                            "p0",
+                            "(\"a1\" / \"a0\")"
+                        )))
                         .setLimitSpec(
                             new DefaultLimitSpec(
                                 Collections.singletonList(
diff --git a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
index beb503d..9542b03 100644
--- a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
@@ -40,16 +40,16 @@ public class CalcitesTest extends CalciteTestBase
   }
 
   @Test
-  public void testFindOutputNamePrefix()
+  public void testFindUnusedPrefix()
   {
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
-    Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
-    Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
-    Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
-    Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
-    Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
+    Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
+    Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
+    Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
+    Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org