You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/08/26 23:15:12 UTC
[incubator-druid] branch 0.12.3 updated: [Backport] SQL: Finalize
aggregations for inner queries when necessary. (#6229)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.3 by this push:
new 98234e8 [Backport] SQL: Finalize aggregations for inner queries when necessary. (#6229)
98234e8 is described below
commit 98234e87741b577e451b9128dc9e2c674de41208
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Sun Aug 26 16:15:08 2018 -0700
[Backport] SQL: Finalize aggregations for inner queries when necessary. (#6229)
* SQL: Finalize aggregations for inner queries when necessary. (#6221)
* SQL: Finalize aggregations for inner queries when necessary.
Fixes #5779.
* Fixed test method name.
* Fix test for 0.12 branch.
---
.../histogram/sql/QuantileSqlAggregator.java | 5 +-
.../calcite/aggregation/DimensionExpression.java | 4 +-
.../sql/calcite/aggregation/SqlAggregator.java | 6 +-
.../builtin/ApproxCountDistinctSqlAggregator.java | 27 ++++++---
.../aggregation/builtin/AvgSqlAggregator.java | 9 +--
.../aggregation/builtin/CountSqlAggregator.java | 6 +-
.../aggregation/builtin/MaxSqlAggregator.java | 3 +-
.../aggregation/builtin/MinSqlAggregator.java | 3 +-
.../aggregation/builtin/SumSqlAggregator.java | 3 +-
.../io/druid/sql/calcite/planner/Calcites.java | 11 +++-
.../druid/sql/calcite/rel/DruidOuterQueryRel.java | 18 ++++--
.../java/io/druid/sql/calcite/rel/DruidQuery.java | 34 +++++++----
.../io/druid/sql/calcite/rel/DruidQueryRel.java | 13 ++--
.../java/io/druid/sql/calcite/rel/DruidRel.java | 6 +-
.../io/druid/sql/calcite/rel/DruidSemiJoin.java | 4 +-
.../druid/sql/calcite/rel/PartialDruidQuery.java | 5 +-
.../java/io/druid/sql/calcite/rule/DruidRules.java | 4 +-
.../io/druid/sql/calcite/rule/GroupByRules.java | 6 +-
.../io/druid/sql/calcite/CalciteQueryTest.java | 69 ++++++++++++++++++++++
.../io/druid/sql/calcite/planner/CalcitesTest.java | 20 +++----
20 files changed, 192 insertions(+), 64 deletions(-)
diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
index b9c8d3d..28c0d65 100644
--- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
+++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
@@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
@@ -62,6 +63,7 @@ public class QuantileSqlAggregator implements SqlAggregator
return FUNCTION_INSTANCE;
}
+ @Nullable
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
@@ -70,7 +72,8 @@ public class QuantileSqlAggregator implements SqlAggregator
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
final DruidExpression input = Expressions.toDruidExpression(
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
index d5da02d..abc697c 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
@@ -20,13 +20,13 @@
package io.druid.sql.calcite.aggregation;
import com.google.common.collect.ImmutableList;
-import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.VirtualColumn;
import io.druid.segment.column.ValueType;
import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
import javax.annotation.Nullable;
import java.util.List;
@@ -85,7 +85,7 @@ public class DimensionExpression
@Nullable
public String getVirtualColumnName()
{
- return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v", outputName);
+ return expression.isSimpleExtraction() ? null : Calcites.makePrefixedName(outputName, "v");
}
@Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
index e6983ff..dcf2c4e 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
@@ -53,6 +53,9 @@ public interface SqlAggregator
* @param project project that should be applied before aggregation; may be null
* @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
* ignored if you do not want to re-use existing aggregations.
+ * @param finalizeAggregations true if this query should include explicit finalization for all of its
+ * aggregators, where required. This is set for subqueries where Druid's native query
+ * layer does not do this automatically.
*
* @return aggregation, or null if the call cannot be translated
*/
@@ -64,6 +67,7 @@ public interface SqlAggregator
String name,
AggregateCall aggregateCall,
Project project,
- List<Aggregation> existingAggregations
+ List<Aggregation> existingAggregations,
+ boolean finalizeAggregations
);
}
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
index 161c3ef..0abdb8a 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
@@ -22,9 +22,9 @@ package io.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@@ -52,6 +52,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class ApproxCountDistinctSqlAggregator implements SqlAggregator
@@ -74,7 +75,8 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
@@ -92,14 +94,15 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
final List<VirtualColumn> virtualColumns = new ArrayList<>();
final AggregatorFactory aggregatorFactory;
+ final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
if (arg.isDirectColumnAccess() && rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) {
- aggregatorFactory = new HyperUniquesAggregatorFactory(name, arg.getDirectColumn(), false, true);
+ aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName, arg.getDirectColumn(), false, true);
} else {
final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
if (inputType == null) {
- throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, name);
+ throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
}
final DimensionSpec dimensionSpec;
@@ -108,7 +111,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn(
- StringUtils.format("%s:v", name),
+ Calcites.makePrefixedName(name, "v"),
inputType,
plannerContext.getExprMacroTable()
);
@@ -116,10 +119,20 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
virtualColumns.add(virtualColumn);
}
- aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true);
+ aggregatorFactory = new CardinalityAggregatorFactory(
+ aggregatorName,
+ null,
+ ImmutableList.of(dimensionSpec),
+ false,
+ true
+ );
}
- return Aggregation.create(virtualColumns, aggregatorFactory);
+ return Aggregation.create(
+ virtualColumns,
+ Collections.singletonList(aggregatorFactory),
+ finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name, aggregatorFactory.getName()) : null
+ );
}
private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
index 9948c96..32de829 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
@@ -21,7 +21,6 @@ package io.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@@ -32,6 +31,7 @@ import io.druid.sql.calcite.aggregation.Aggregation;
import io.druid.sql.calcite.aggregation.Aggregations;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.rel.core.AggregateCall;
@@ -61,7 +61,8 @@ public class AvgSqlAggregator implements SqlAggregator
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
@@ -102,8 +103,8 @@ public class AvgSqlAggregator implements SqlAggregator
expression = arg.getExpression();
}
- final String sumName = StringUtils.format("%s:sum", name);
- final String countName = StringUtils.format("%s:count", name);
+ final String sumName = Calcites.makePrefixedName(name, "sum");
+ final String countName = Calcites.makePrefixedName(name, "count");
final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory(
sumType,
sumName,
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
index f8a359b..d82e581 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
@@ -60,7 +60,8 @@ public class CountSqlAggregator implements SqlAggregator
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
final List<DruidExpression> args = Aggregations.getArgumentsForSimpleAggregator(
@@ -87,7 +88,8 @@ public class CountSqlAggregator implements SqlAggregator
name,
aggregateCall,
project,
- existingAggregations
+ existingAggregations,
+ finalizeAggregations
);
} else {
return null;
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
index 8be9bdf..6072c11 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
@@ -60,7 +60,8 @@ public class MaxSqlAggregator implements SqlAggregator
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
index f23f2d6..fb6d009 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
@@ -60,7 +60,8 @@ public class MinSqlAggregator implements SqlAggregator
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
index bea1f25..6b7a9ba 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
@@ -60,7 +60,8 @@ public class SumSqlAggregator implements SqlAggregator
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
index ac566a6..bf39def 100644
--- a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
@@ -297,21 +297,26 @@ public class Calcites
return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
}
- public static String findOutputNamePrefix(final String basePrefix, final NavigableSet<String> strings)
+ public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
{
String prefix = basePrefix;
- while (!isUsablePrefix(strings, prefix)) {
+ while (!isUnusedPrefix(prefix, strings)) {
prefix = "_" + prefix;
}
return prefix;
}
- private static boolean isUsablePrefix(final NavigableSet<String> strings, final String prefix)
+ private static boolean isUnusedPrefix(final String prefix, final NavigableSet<String> strings)
{
// ":" is one character after "9"
final NavigableSet<String> subSet = strings.subSet(prefix + "0", true, prefix + ":", false);
return subSet.isEmpty();
}
+
+ public static String makePrefixedName(final String prefix, final String suffix)
+ {
+ return StringUtils.format("%s:%s", prefix, suffix);
+ }
}
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
index b54cf2d..56e80c9 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
@@ -88,7 +88,11 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
@Override
public Sequence<Object[]> runQuery()
{
- final DruidQuery query = toDruidQuery();
+ // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+ // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+ // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+ final DruidQuery query = toDruidQuery(false);
if (query != null) {
return getQueryMaker().runQuery(query);
} else {
@@ -116,9 +120,11 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
@Nullable
@Override
- public DruidQuery toDruidQuery()
+ public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
- final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery();
+ // Must finalize aggregations on subqueries.
+
+ final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true);
if (subQuery == null) {
return null;
}
@@ -128,7 +134,8 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
new QueryDataSource(subQuery.toGroupByQuery()),
sourceRowSignature,
getPlannerContext(),
- getCluster().getRexBuilder()
+ getCluster().getRexBuilder(),
+ finalizeAggregations
);
}
@@ -142,7 +149,8 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
sourceRel.getRowType()
),
getPlannerContext(),
- getCluster().getRexBuilder()
+ getCluster().getRexBuilder(),
+ false
);
}
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
index bca4481..9740f68 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
@@ -115,7 +115,8 @@ public class DruidQuery
final DataSource dataSource,
final RowSignature sourceRowSignature,
final PlannerContext plannerContext,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
this.dataSource = dataSource;
@@ -126,7 +127,7 @@ public class DruidQuery
// Now the fun begins.
this.filter = computeWhereFilter(partialQuery, sourceRowSignature, plannerContext);
this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceRowSignature);
- this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder);
+ this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder, finalizeAggregations);
if (this.selectProjection != null) {
this.outputRowSignature = this.selectProjection.getOutputRowSignature();
@@ -199,7 +200,7 @@ public class DruidQuery
final List<VirtualColumn> virtualColumns = new ArrayList<>();
final List<String> rowOrder = new ArrayList<>();
- final String virtualColumnPrefix = Calcites.findOutputNamePrefix(
+ final String virtualColumnPrefix = Calcites.findUnusedPrefix(
"v",
new TreeSet<>(sourceRowSignature.getRowOrder())
);
@@ -231,7 +232,8 @@ public class DruidQuery
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final RowSignature sourceRowSignature,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
final Aggregate aggregate = partialQuery.getAggregate();
@@ -246,7 +248,8 @@ public class DruidQuery
partialQuery,
plannerContext,
sourceRowSignature,
- rexBuilder
+ rexBuilder,
+ finalizeAggregations
);
final RowSignature aggregateRowSignature = RowSignature.from(
@@ -340,7 +343,7 @@ public class DruidQuery
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<DimensionExpression> dimensions = new ArrayList<>();
- final String outputNamePrefix = Calcites.findOutputNamePrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
+ final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
int outputNameCounter = 0;
for (int i : aggregate.getGroupSet()) {
@@ -372,10 +375,13 @@ public class DruidQuery
/**
* Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order.
*
- * @param partialQuery partial query
- * @param plannerContext planner context
- * @param sourceRowSignature source row signature
- * @param rexBuilder calcite RexBuilder
+ * @param partialQuery partial query
+ * @param plannerContext planner context
+ * @param sourceRowSignature source row signature
+ * @param rexBuilder calcite RexBuilder
+ * @param finalizeAggregations true if this query should include explicit finalization for all of its
+ * aggregators, where required. Useful for subqueries where Druid's native query layer
+ * does not do this automatically.
*
* @return aggregations
*
@@ -385,12 +391,13 @@ public class DruidQuery
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final RowSignature sourceRowSignature,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<Aggregation> aggregations = new ArrayList<>();
- final String outputNamePrefix = Calcites.findOutputNamePrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
+ final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
final String aggName = outputNamePrefix + i;
@@ -402,7 +409,8 @@ public class DruidQuery
partialQuery.getSelectProject(),
aggCall,
aggregations,
- aggName
+ aggName,
+ finalizeAggregations
);
if (aggregation == null) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
index 5d0ea43..c304e5b 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
@@ -93,20 +93,21 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
@Override
@Nonnull
- public DruidQuery toDruidQuery()
+ public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
return partialQuery.build(
druidTable.getDataSource(),
druidTable.getRowSignature(),
getPlannerContext(),
- getCluster().getRexBuilder()
+ getCluster().getRexBuilder(),
+ finalizeAggregations
);
}
@Override
public DruidQuery toDruidQueryForExplaining()
{
- return toDruidQuery();
+ return toDruidQuery(false);
}
@Override
@@ -169,7 +170,11 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
@Override
public Sequence<Object[]> runQuery()
{
- return getQueryMaker().runQuery(toDruidQuery());
+ // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+ // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+ // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+ return getQueryMaker().runQuery(toDruidQuery(false));
}
@Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
index 32ff120..9244b50 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
@@ -76,12 +76,16 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode imple
*
* This method may return null if it knows that this rel will yield an empty result set.
*
+ * @param finalizeAggregations true if this query should include explicit finalization for all of its
+ * aggregators, where required. Useful for subqueries where Druid's native query layer
+ * does not do this automatically.
+ *
* @return query, or null if it is known in advance that this rel will yield an empty result set.
*
* @throws CannotBuildQueryException
*/
@Nullable
- public abstract DruidQuery toDruidQuery();
+ public abstract DruidQuery toDruidQuery(boolean finalizeAggregations);
/**
* Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation. For
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
index 5d6abfc..ecfd8bb 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
@@ -142,10 +142,10 @@ public class DruidSemiJoin extends DruidRel<DruidSemiJoin>
@Nullable
@Override
- public DruidQuery toDruidQuery()
+ public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final DruidRel rel = getLeftRelWithFilter();
- return rel != null ? rel.toDruidQuery() : null;
+ return rel != null ? rel.toDruidQuery(finalizeAggregations) : null;
}
@Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
index 263c2e0..01c960c 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -246,10 +246,11 @@ public class PartialDruidQuery
final DataSource dataSource,
final RowSignature sourceRowSignature,
final PlannerContext plannerContext,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
- return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder);
+ return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations);
}
public boolean canAccept(final Stage stage)
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
index 5ec46ab..b565aba 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
@@ -101,7 +101,7 @@ public class DruidRules
{
super(
operand(relClass, operand(DruidRel.class, any())),
- StringUtils.format("%s:%s", DruidQueryRule.class.getSimpleName(), stage)
+ StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage)
);
this.stage = stage;
this.f = f;
@@ -229,7 +229,7 @@ public class DruidRules
public DruidOuterQueryRule(final RelOptRuleOperand op, final String description)
{
- super(op, StringUtils.format("%s:%s", DruidOuterQueryRel.class.getSimpleName(), description));
+ super(op, StringUtils.format("%s(%s)", DruidOuterQueryRel.class.getSimpleName(), description));
}
@Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
index 1308981..e3b52b4 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
@@ -57,7 +57,8 @@ public class GroupByRules
final Project project,
final AggregateCall call,
final List<Aggregation> existingAggregations,
- final String name
+ final String name,
+ final boolean finalizeAggregations
)
{
final DimFilter filter;
@@ -125,7 +126,8 @@ public class GroupByRules
name,
call,
project,
- existingAggregationsWithSameFilter
+ existingAggregationsWithSameFilter,
+ finalizeAggregations
);
if (retVal == null) {
diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
index 19722de..5bfa8a3 100644
--- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
@@ -48,6 +48,7 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator;
@@ -4033,6 +4034,74 @@ public class CalciteQueryTest extends CalciteTestBase
}
@Test
+ public void testAvgDailyCountDistinct() throws Exception
+ {
+ testQuery(
+ "SELECT\n"
+ + " AVG(u)\n"
+ + "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT(cnt) AS u FROM druid.foo GROUP BY 1)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(QSS(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setVirtualColumns(
+ EXPRESSION_VIRTUAL_COLUMN(
+ "d0:v",
+ "timestamp_floor(\"__time\",'P1D','','UTC')",
+ ValueType.LONG
+ )
+ )
+ .setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG)))
+ .setAggregatorSpecs(
+ AGGS(
+ new CardinalityAggregatorFactory(
+ "a0:a",
+ null,
+ DIMS(new DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)),
+ false,
+ true
+ )
+ )
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new HyperUniqueFinalizingPostAggregator("a0", "a0:a")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ )
+ .setInterval(QSS(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(AGGS(
+ new LongSumAggregatorFactory("_a0:sum", "a0"),
+ new CountAggregatorFactory("_a0:count")
+ ))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "_a0",
+ "quotient",
+ ImmutableList.of(
+ new FieldAccessPostAggregator(null, "_a0:sum"),
+ new FieldAccessPostAggregator(null, "_a0:count")
+ )
+ )
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{1L})
+ );
+ }
+
+ @Test
public void testTopNFilterJoin() throws Exception
{
// Filters on top N values of some dimension by using an inner join.
diff --git a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
index 937ba7f..ae0cbf2 100644
--- a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
@@ -41,16 +41,16 @@ public class CalcitesTest extends CalciteTestBase
}
@Test
- public void testFindOutputNamePrefix()
+ public void testFindUnusedPrefix()
{
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
- Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
- Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
- Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
- Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
+ Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
+ Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
+ Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
+ Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org