You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/04 10:20:52 UTC
[ignite] branch sql-calcite updated: IGNITE-13543 Calcite.
Introduce sort-based aggregates
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new dcf6913 IGNITE-13543 Calcite. Introduce sort-based aggregates
dcf6913 is described below
commit dcf6913677abda06e4244a55e8f7cc9e218eb7c8
Author: tledkov <tl...@gridgain.com>
AuthorDate: Thu Mar 4 13:20:33 2021 +0300
IGNITE-13543 Calcite. Introduce sort-based aggregates
---
.../query/calcite/exec/LogicalRelImplementor.java | 126 ++++-
.../query/calcite/exec/exp/ExpressionFactory.java | 8 +-
.../calcite/exec/exp/ExpressionFactoryImpl.java | 9 +-
.../calcite/exec/exp/agg/AccumulatorsFactory.java | 16 +-
.../query/calcite/exec/exp/agg/AggregateType.java} | 28 +-
.../{AggregateNode.java => HashAggregateNode.java} | 18 +-
.../query/calcite/exec/rel/SortAggregateNode.java | 304 ++++++++++++
.../query/calcite/externalize/RelJson.java | 1 +
.../calcite/metadata/IgniteMdDistinctRowCount.java | 52 +++
.../query/calcite/metadata/IgniteMetadata.java | 1 +
.../query/calcite/metadata/cost/IgniteCost.java | 3 +
.../processors/query/calcite/prepare/Cloner.java | 29 +-
.../query/calcite/prepare/IgniteRelShuttle.java | 31 +-
.../query/calcite/prepare/PlannerPhase.java | 6 +-
.../query/calcite/rel/IgniteAggregate.java | 274 +++--------
...niteAggregate.java => IgniteAggregateBase.java} | 122 ++---
.../query/calcite/rel/IgniteHashAggregate.java | 98 ++++
...pAggregate.java => IgniteMapHashAggregate.java} | 66 +--
.../query/calcite/rel/IgniteMapSortAggregate.java | 137 ++++++
...gregate.java => IgniteReduceAggregateBase.java} | 54 +--
.../calcite/rel/IgniteReduceHashAggregate.java | 104 +++++
.../calcite/rel/IgniteReduceSortAggregate.java | 123 +++++
.../query/calcite/rel/IgniteRelVisitor.java | 21 +-
.../query/calcite/rel/IgniteSortAggregate.java | 151 ++++++
...erRule.java => HashAggregateConverterRule.java} | 12 +-
...erRule.java => SortAggregateConverterRule.java} | 36 +-
.../query/calcite/rule/UnionConverterRule.java | 5 +-
.../processors/query/calcite/trait/TraitUtils.java | 40 ++
.../query/calcite/AggregatesIntegrationTest.java | 4 +
.../calcite/SortAggregateIntegrationTest.java | 154 ++++++
.../calcite/exec/rel/AbstractExecutionTest.java | 47 +-
.../query/calcite/exec/rel/BaseAggregateTest.java | 448 ++++++++++++++++++
.../query/calcite/exec/rel/ExecutionTest.java | 511 +-------------------
.../exec/rel/HashAggregateExecutionTest.java | 138 ++++++
.../rel/HashAggregateSingleGroupExecutionTest.java | 518 +++++++++++++++++++++
.../exec/rel/SortAggregateExecutionTest.java | 146 ++++++
.../query/calcite/planner/AbstractPlannerTest.java | 5 +
.../calcite/planner/AggregatePlannerTest.java | 397 ++++++++++++++++
.../query/calcite/planner/HashAggregateTest.java | 105 +++++
.../query/calcite/planner/PlannerTest.java | 93 ----
.../ignite/testsuites/ExecutionTestSuite.java | 6 +
.../ignite/testsuites/IgniteCalciteTestSuite.java | 6 +-
.../apache/ignite/testsuites/PlannerTestSuite.java | 4 +-
43 files changed, 3393 insertions(+), 1064 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index e9a1ec3..6e628e7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -36,7 +36,9 @@ import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
@@ -54,19 +56,22 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNo
import org.apache.ignite.internal.processors.query.calcite.exec.rel.UnionAllNode;
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -433,8 +438,8 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteAggregate rel) {
- AggregateNode.AggregateType type = AggregateNode.AggregateType.SINGLE;
+ @Override public Node<Row> visit(IgniteHashAggregate rel) {
+ AggregateType type = AggregateType.SINGLE;
RelDataType rowType = rel.getRowType();
RelDataType inputType = rel.getInput().getRowType();
@@ -443,7 +448,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
type, rel.getAggCallList(), inputType);
RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
- AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
+ HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
Node<Row> input = visit(rel.getInput());
@@ -453,8 +458,8 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteMapAggregate rel) {
- AggregateNode.AggregateType type = AggregateNode.AggregateType.MAP;
+ @Override public Node<Row> visit(IgniteMapHashAggregate rel) {
+ AggregateType type = AggregateType.MAP;
RelDataType rowType = rel.getRowType();
RelDataType inputType = rel.getInput().getRowType();
@@ -463,7 +468,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
type, rel.getAggCallList(), inputType);
RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
- AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
+ HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
Node<Row> input = visit(rel.getInput());
@@ -473,8 +478,8 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteReduceAggregate rel) {
- AggregateNode.AggregateType type = AggregateNode.AggregateType.REDUCE;
+ @Override public Node<Row> visit(IgniteReduceHashAggregate rel) {
+ AggregateType type = AggregateType.REDUCE;
RelDataType rowType = rel.getRowType();
@@ -482,7 +487,102 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
type, rel.aggregateCalls(), null);
RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
- AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, rel.groupSets(), accFactory, rowFactory);
+ HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.groupSets(), accFactory, rowFactory);
+
+ Node<Row> input = visit(rel.getInput());
+
+ node.register(input);
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteSortAggregate rel) {
+ AggregateType type = AggregateType.SINGLE;
+
+ RelDataType rowType = rel.getRowType();
+ RelDataType inputType = rel.getInput().getRowType();
+
+ Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
+ type,
+ rel.getAggCallList(),
+ inputType
+ );
+
+ RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+ SortAggregateNode<Row> node = new SortAggregateNode<>(
+ ctx,
+ rowType,
+ type,
+ rel.getGroupSet(),
+ accFactory,
+ rowFactory,
+ expressionFactory.comparator(rel.collation())
+ );
+
+ Node<Row> input = visit(rel.getInput());
+
+ node.register(input);
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteMapSortAggregate rel) {
+ AggregateType type = AggregateType.MAP;
+
+ RelDataType rowType = rel.getRowType();
+ RelDataType inputType = rel.getInput().getRowType();
+
+ Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
+ type,
+ rel.getAggCallList(),
+ inputType
+ );
+
+ RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+ SortAggregateNode<Row> node = new SortAggregateNode<>(
+ ctx,
+ rowType,
+ type,
+ rel.getGroupSet(),
+ accFactory,
+ rowFactory,
+ expressionFactory.comparator(rel.collation())
+ );
+
+ Node<Row> input = visit(rel.getInput());
+
+ node.register(input);
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteReduceSortAggregate rel) {
+ AggregateType type = AggregateType.REDUCE;
+
+ RelDataType rowType = rel.getRowType();
+
+ Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
+ type,
+ rel.aggregateCalls(),
+ null
+ );
+
+ RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+ SortAggregateNode<Row> node = new SortAggregateNode<>(
+ ctx,
+ rowType,
+ type,
+ rel.groupSet(),
+ accFactory,
+ rowFactory,
+ expressionFactory.comparator(rel.collation())
+ );
Node<Row> input = visit(rel.getInput());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
index 7361ca2..7255955 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
@@ -31,14 +31,18 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
/**
* Expression factory.
*/
public interface ExpressionFactory<Row> {
/** */
- Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(AggregateNode.AggregateType type, List<AggregateCall> calls, RelDataType rowType);
+ Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(
+ AggregateType type,
+ List<AggregateCall> calls,
+ RelDataType rowType
+ );
/**
* Creates a comparator for given data type and collations. Mainly used for sorted exchange.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
index c1a883f..ef10723 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
@@ -61,7 +61,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorsFactory;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
@@ -102,8 +102,11 @@ public class ExpressionFactoryImpl<Row> implements ExpressionFactory<Row> {
}
/** {@inheritDoc} */
- @Override public Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(AggregateNode.AggregateType type,
- List<AggregateCall> calls, RelDataType rowType) {
+ @Override public Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(
+ AggregateType type,
+ List<AggregateCall> calls,
+ RelDataType rowType
+ ) {
return new AccumulatorsFactory<>(ctx, type, calls, rowType);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
index 7cd8ece..afc803c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
@@ -50,7 +50,6 @@ import org.apache.calcite.util.Pair;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -146,8 +145,12 @@ public class AccumulatorsFactory<Row> implements Supplier<List<AccumulatorWrappe
private final List<WrapperPrototype> prototypes;
/** */
- public AccumulatorsFactory(ExecutionContext<Row> ctx, AggregateType type,
- List<AggregateCall> aggCalls, RelDataType inputRowType) {
+ public AccumulatorsFactory(
+ ExecutionContext<Row> ctx,
+ AggregateType type,
+ List<AggregateCall> aggCalls,
+ RelDataType inputRowType
+ ) {
this.ctx = ctx;
this.type = type;
this.inputRowType = inputRowType;
@@ -265,9 +268,12 @@ public class AccumulatorsFactory<Row> implements Supplier<List<AccumulatorWrappe
private final RowHandler<Row> handler;
/** */
- AccumulatorWrapperImpl(Accumulator accumulator, AggregateCall call,
+ AccumulatorWrapperImpl(
+ Accumulator accumulator,
+ AggregateCall call,
Function<Object[], Object[]> inAdapter,
- Function<Object, Object> outAdapter) {
+ Function<Object, Object> outAdapter
+ ) {
this.accumulator = accumulator;
this.inAdapter = inAdapter;
this.outAdapter = outAdapter;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AggregateType.java
similarity index 53%
copy from modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AggregateType.java
index ad329f1..c743e2a 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AggregateType.java
@@ -15,24 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
-
-import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;
/**
- * Calcite tests.
+ *
*/
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
- PlannerTest.class,
- CorrelatedNestedLoopJoinPlannerTest.class,
- TableSpoolPlannerTest.class,
- IndexSpoolPlannerTest.class
-})
-public class PlannerTestSuite {
+public enum AggregateType {
+ /** Map phase. */
+ MAP,
+
+ /** Reduce phase. */
+ REDUCE,
+
+ /** Single phase aggregate. */
+ SINGLE
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
index 08743da..1f4728e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
@@ -44,7 +45,7 @@ import static org.apache.ignite.internal.processors.query.calcite.util.Commons.n
/**
*
*/
-public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
+public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
/** */
private final AggregateType type;
@@ -72,7 +73,7 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<
/**
* @param ctx Execution context.
*/
- public AggregateNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, List<ImmutableBitSet> grpSets,
+ public HashAggregateNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, List<ImmutableBitSet> grpSets,
Supplier<List<AccumulatorWrapper<Row>>> accFactory, RowFactory<Row> rowFactory) {
super(ctx, rowType);
@@ -215,19 +216,6 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<
}
/** */
- @SuppressWarnings("PublicInnerClass")
- public enum AggregateType {
- /** Map phase. */
- MAP,
-
- /** Reduce phase. */
- REDUCE,
-
- /** Single phase aggregate. */
- SINGLE
- }
-
- /** */
private class Grouping {
/** */
private final byte grpId;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
new file mode 100644
index 0000000..af63e34
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class SortAggregateNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
+ /** */
+ private final AggregateType type;
+
+ /** */
+ private final Supplier<List<AccumulatorWrapper<Row>>> accFactory;
+
+ /** */
+ private final RowFactory<Row> rowFactory;
+
+ /** */
+ private final ImmutableBitSet grpSet;
+
+ /** */
+ private final Comparator<Row> comp;
+
+ /** */
+ private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+ /** */
+ private Row prevRow;
+
+ /** */
+ private Group grp;
+
+ /** */
+ private int requested;
+
+ /** */
+ private int waiting;
+
+ /** */
+ private int cmpRes;
+
+ /**
+ * @param ctx Execution context.
+ */
+ public SortAggregateNode(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ AggregateType type,
+ ImmutableBitSet grpSet,
+ Supplier<List<AccumulatorWrapper<Row>>> accFactory,
+ RowFactory<Row> rowFactory,
+ Comparator<Row> comp
+ ) {
+ super(ctx, rowType);
+
+ this.type = type;
+ this.accFactory = accFactory;
+ this.rowFactory = rowFactory;
+ this.grpSet = grpSet;
+ this.comp = comp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(int rowsCnt) throws Exception {
+ assert !F.isEmpty(sources()) && sources().size() == 1;
+ assert rowsCnt > 0 && requested == 0;
+
+ checkState();
+
+ requested = rowsCnt;
+
+ if (!outBuf.isEmpty())
+ doPush();
+
+ if (waiting == 0) {
+ waiting = IN_BUFFER_SIZE;
+
+ source().request(IN_BUFFER_SIZE);
+ }
+ else if (waiting < 0)
+ downstream().end();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void push(Row row) throws Exception {
+ assert downstream() != null;
+ assert waiting > 0;
+
+ checkState();
+
+ waiting--;
+
+ if (grp != null) {
+ int cmp = comp.compare(row, prevRow);
+
+ if (cmp == 0)
+ grp.add(row);
+ else {
+ if (cmpRes == 0)
+ cmpRes = cmp;
+ else
+ assert Integer.signum(cmp) == Integer.signum(cmpRes) : "Input not sorted";
+
+ outBuf.add(grp.row());
+
+ grp = newGroup(row);
+
+ doPush();
+ }
+ }
+ else
+ grp = newGroup(row);
+
+ prevRow = row;
+
+ if (waiting == 0 && requested > 0) {
+ waiting = IN_BUFFER_SIZE;
+
+ context().execute(() -> source().request(IN_BUFFER_SIZE), this::onError);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void end() throws Exception {
+ assert downstream() != null;
+ assert waiting > 0;
+
+ checkState();
+
+ waiting = -1;
+
+ if (grp != null) {
+ outBuf.add(grp.row());
+
+ doPush();
+ }
+
+ if (requested > 0)
+ downstream().end();
+
+ grp = null;
+ prevRow = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void rewindInternal() {
+ requested = 0;
+ waiting = 0;
+ grp = null;
+ prevRow = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Downstream<Row> requestDownstream(int idx) {
+ if (idx != 0)
+ throw new IndexOutOfBoundsException();
+
+ return this;
+ }
+
+ /** */
+ private Group newGroup(Row r) {
+ final Object[] grpKeys = new Object[grpSet.cardinality()];
+ List<Integer> fldIdxs = grpSet.asList();
+
+ final RowHandler<Row> rowHandler = rowFactory.handler();
+
+ for (int i = 0; i < grpKeys.length; ++i)
+ grpKeys[i] = rowHandler.get(fldIdxs.get(i), r);
+
+ Group grp = new Group(grpKeys);
+
+ grp.add(r);
+
+ return grp;
+ }
+
+ /** */
+ private void doPush() throws Exception {
+ while (requested > 0 && !outBuf.isEmpty()) {
+ requested--;
+
+ downstream().push(outBuf.poll());
+ }
+ }
+
+ /** */
+ private class Group {
+ /** */
+ private final List<AccumulatorWrapper<Row>> accumWrps;
+
+ /** */
+ private final RowHandler<Row> handler;
+
+ /** */
+ private final Object[] grpKeys;
+
+ /** */
+ private Group(Object[] grpKeys) {
+ this.grpKeys = grpKeys;
+
+ accumWrps = accFactory.get();
+
+ handler = context().rowHandler();
+ }
+
+ /** */
+ private void add(Row row) {
+ if (type == AggregateType.REDUCE)
+ addOnReducer(row);
+ else
+ addOnMapper(row);
+ }
+
+ /** */
+ private Row row() {
+ if (type == AggregateType.MAP)
+ return rowOnMapper();
+ else
+ return rowOnReducer();
+ }
+
+ /** */
+ private void addOnMapper(Row row) {
+ for (AccumulatorWrapper<Row> wrapper : accumWrps)
+ wrapper.add(row);
+ }
+
+ /** */
+ private void addOnReducer(Row row) {
+ List<Accumulator> accums = (List<Accumulator>)handler.get(handler.columnCount(row) - 1, row);
+
+ for (int i = 0; i < accums.size(); i++) {
+ AccumulatorWrapper<Row> wrapper = accumWrps.get(i);
+
+ Accumulator accum = accums.get(i);
+
+ wrapper.apply(accum);
+ }
+ }
+
+ /** */
+ private Row rowOnMapper() {
+ Object[] fields = new Object[grpSet.cardinality() + 1];
+
+ int i = 0;
+
+ for (Object grpKey : grpKeys)
+ fields[i++] = grpKey;
+
+ // Last column is the accumulators collection.
+ fields[i] = Commons.transform(accumWrps, AccumulatorWrapper::accumulator);
+
+ return rowFactory.create(fields);
+ }
+
+ /** */
+ private Row rowOnReducer() {
+ Object[] fields = new Object[grpSet.cardinality() + accumWrps.size()];
+
+ int i = 0;
+
+ for (Object grpKey : grpKeys)
+ fields[i++] = grpKey;
+
+ for (AccumulatorWrapper<Row> accWrp : accumWrps)
+ fields[i++] = accWrp.end();
+
+ return rowFactory.create(fields);
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index cf01709..b91e3e0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -656,6 +656,7 @@ class RelJson {
map.put("type", toJson(node.getType()));
map.put("distinct", node.isDistinct());
map.put("operands", node.getArgList());
+ map.put("filter", node.filterArg);
map.put("name", node.getName());
return map;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistinctRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistinctRowCount.java
new file mode 100644
index 0000000..15a668b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistinctRowCount.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/** */
+@SuppressWarnings("unused") // actually all methods are used by runtime generated classes
+public class IgniteMdDistinctRowCount extends RelMdDistinctRowCount {
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltInMethod.DISTINCT_ROW_COUNT.method, new IgniteMdDistinctRowCount());
+
+ /** {@inheritDoc} */
+ @Override public Double getDistinctRowCount(
+ RelSubset rel,
+ RelMetadataQuery mq,
+ ImmutableBitSet groupKey,
+ RexNode predicate
+ ) {
+ if (groupKey.cardinality() == 0)
+ return 1d;
+
+ double rowCount = mq.getRowCount(rel);
+
+ rowCount *= 1.0 - Math.pow(.5, groupKey.cardinality());
+
+ return rowCount;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index a8a941e..b279c05 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -47,6 +47,7 @@ public class IgniteMetadata {
IgniteMdPredicates.SOURCE,
IgniteMdCollation.SOURCE,
IgniteMdSelectivity.SOURCE,
+ IgniteMdDistinctRowCount.SOURCE,
// Basic providers
DefaultRelMetadataProvider.INSTANCE));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
index a78ff09..60bd64e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
@@ -37,6 +37,9 @@ public class IgniteCost implements RelOptCost {
/** Cost of a comparison of one row. */
public static final double ROW_COMPARISON_COST = 3;
+ /** Memory cost of a aggregate call. */
+ public static final double AGG_CALL_MEM_COST = 5;
+
/**
* With broadcast distribution each row will be sent to the each distination node,
* thus the total bytes amount will be multiplies of the destination nodes count.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 34e1ee7..2069945 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -19,23 +19,26 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
@@ -187,17 +190,31 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteAggregate rel) {
+ @Override public IgniteRel visit(IgniteHashAggregate rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteMapAggregate rel) {
+ @Override public IgniteRel visit(IgniteMapHashAggregate rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteReduceAggregate rel) {
+ @Override public IgniteRel visit(IgniteReduceHashAggregate rel) {
+ return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSortAggregate rel) {
+ return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMapSortAggregate rel) {
+ return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ @Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 870962c..7e232f29 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -18,23 +18,27 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
@@ -86,17 +90,32 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteAggregate rel) {
+ @Override public IgniteRel visit(IgniteHashAggregate rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMapHashAggregate rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReduceHashAggregate rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSortAggregate rel) {
return processNode(rel);
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteMapAggregate rel) {
+ @Override public IgniteRel visit(IgniteMapSortAggregate rel) {
return processNode(rel);
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteReduceAggregate rel) {
+ @Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
return processNode(rel);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 768fc24..58d8667 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -39,14 +39,15 @@ import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.RuleSets;
-import org.apache.ignite.internal.processors.query.calcite.rule.AggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.CorrelatedNestedLoopJoinRule;
import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMergeRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.HashAggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.TableModifyConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.UnionConverterRule;
@@ -159,7 +160,8 @@ public enum PlannerPhase {
ValuesConverterRule.INSTANCE,
LogicalScanConverterRule.INDEX_SCAN,
LogicalScanConverterRule.TABLE_SCAN,
- AggregateConverterRule.INSTANCE,
+ HashAggregateConverterRule.INSTANCE,
+ SortAggregateConverterRule.INSTANCE,
MergeJoinConverterRule.INSTANCE,
NestedLoopJoinConverterRule.INSTANCE,
ProjectConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
index e951b34..debbdcc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
@@ -17,258 +17,104 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
-import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
/**
*
*/
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
+public abstract class IgniteAggregate extends Aggregate implements IgniteRel {
/** {@inheritDoc} */
- public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ protected IgniteAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls
+ ) {
super(cluster, traitSet, ImmutableList.of(), input, groupSet, groupSets, aggCalls);
}
/** {@inheritDoc} */
- public IgniteAggregate(RelInput input) {
+ protected IgniteAggregate(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));
}
- /** {@inheritDoc} */
- @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
- return visitor.visit(this);
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Distribution propagation is based on next rules:
- // 1) Any aggregation is possible on single or broadcast distribution.
- // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
- // and all of input distribution keys are parts of aggregation group and vice versa.
- // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
- RelTraitSet in = inputTraits.get(0);
-
- IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
- RelDistribution.Type distrType = distribution.getType();
-
- switch (distrType) {
- case SINGLETON:
- case BROADCAST_DISTRIBUTED:
- return Pair.of(nodeTraits, ImmutableList.of(in.replace(distribution)));
-
- case RANDOM_DISTRIBUTED:
- if (!groupSet.isEmpty() && isSimple(this)) {
- IgniteDistribution outDistr = hash(range(0, groupSet.cardinality()));
- IgniteDistribution inDistr = hash(groupSet.asList());
-
- return Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in.replace(inDistr)));
- }
-
- break;
-
- case HASH_DISTRIBUTED:
- ImmutableIntList keys = distribution.getKeys();
-
- if (isSimple(this) && groupSet.cardinality() == keys.size()) {
- Mappings.TargetMapping mapping = Commons.inverseMapping(
- groupSet, getInput().getRowType().getFieldCount());
-
- List<Integer> srcKeys = new ArrayList<>(keys.size());
-
- for (int key : keys) {
- int src = mapping.getSourceOpt(key);
-
- if (src == -1)
- break;
-
- srcKeys.add(src);
- }
-
- if (srcKeys.size() == keys.size())
- return Pair.of(nodeTraits, ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
- }
-
- break;
-
- default:
- break;
- }
-
- return Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single())));
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Since it's a hash aggregate it erases collation.
- return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Aggregate is rewindable if its input is rewindable.
-
- RelTraitSet in = inputTraits.get(0);
-
- RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
- ? RewindabilityTrait.ONE_WAY
- : TraitUtils.rewindability(in);
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), ImmutableList.of(in.replace(rewindability))));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Distribution propagation is based on next rules:
- // 1) Any aggregation is possible on single or broadcast distribution.
- // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
- // and all of input distribution keys are parts aggregation group.
- // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
- RelTraitSet in = inputTraits.get(0);
-
- List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
- IgniteDistribution distribution = TraitUtils.distribution(in);
-
- RelDistribution.Type distrType = distribution.getType();
-
- switch (distrType) {
- case SINGLETON:
- case BROADCAST_DISTRIBUTED:
- res.add(Pair.of(nodeTraits.replace(distribution), ImmutableList.of(in)));
-
- break;
-
- case HASH_DISTRIBUTED:
- if (isSimple(this)) {
- ImmutableIntList keys = distribution.getKeys();
-
- if (groupSet.cardinality() == keys.size()) {
- Mappings.TargetMapping mapping = Commons.inverseMapping(
- groupSet, getInput().getRowType().getFieldCount());
-
- IgniteDistribution outDistr = distribution.apply(mapping);
-
- if (outDistr.getType() == HASH_DISTRIBUTED)
- res.add(Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in)));
- }
- }
-
- break;
-
- case RANDOM_DISTRIBUTED:
- // Map-reduce aggregates
- if (isSimple(this)) {
- res.add(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(random()))));
- res.add(Pair.of(nodeTraits.replace(broadcast()), ImmutableList.of(in.replace(random()))));
- }
-
- break;
-
- default:
- break;
- }
-
- if (!res.isEmpty())
- return res;
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single()))));
- }
+ /** */
+ @Override public double estimateRowCount(RelMetadataQuery mq) {
+ Double groupsCnt = mq.getDistinctRowCount(getInput(), groupSet, null);
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Since it's a hash aggregate it erases collation.
+ if (groupsCnt != null)
+ return groupsCnt;
- return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
+ // Estimation of the groups count is not available.
+ // Use heuristic estimation for result rows count.
+ return super.estimateRowCount(mq);
}
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits) {
- return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
- inTraits));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull RelNode createNode(RelTraitSet outTraits, List<RelTraitSet> inTraits) {
- RelTraitSet in = inTraits.get(0);
-
- if (!isMapReduce(outTraits, in))
- return copy(outTraits, ImmutableList.of(convert(getInput(), in)));
-
- if (U.assertionsEnabled()) {
- ImmutableList<RelTrait> diff = in.difference(outTraits);
-
- assert diff.size() == 1 && F.first(diff) == TraitUtils.distribution(outTraits);
+ /** */
+ public double estimateMemoryForGroup(RelMetadataQuery mq) {
+ double mem = groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;
+
+ if (!aggCalls.isEmpty()) {
+ double grps = estimateRowCount(mq);
+ double rows = input.estimateRowCount(mq);
+
+ for (AggregateCall aggCall : aggCalls) {
+ if (aggCall.isDistinct())
+ mem += IgniteCost.AGG_CALL_MEM_COST * rows / grps;
+ else
+ mem += IgniteCost.AGG_CALL_MEM_COST;
+ }
}
- RelNode map = new IgniteMapAggregate(getCluster(), in, convert(getInput(), in), groupSet, groupSets, aggCalls);
- return new IgniteReduceAggregate(getCluster(), outTraits, convert(map, outTraits), groupSet, groupSets, aggCalls, getRowType());
+ return mem;
}
/** */
- private boolean isMapReduce(RelTraitSet out, RelTraitSet in) {
- return TraitUtils.distribution(out).satisfies(single())
- && TraitUtils.distribution(in).satisfies(random());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteAggregate(cluster, getTraitSet(), sole(inputs),
- getGroupSet(), getGroupSets(), getAggCallList());
+ public RelOptCost computeSelfCostHash(RelOptPlanner planner, RelMetadataQuery mq) {
+ IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+ double inRows = mq.getRowCount(getInput());
+ double groups = estimateRowCount(mq);
+
+ return costFactory.makeCost(
+ inRows,
+ inRows * IgniteCost.ROW_PASS_THROUGH_COST,
+ 0,
+ groups * estimateMemoryForGroup(mq),
+ 0
+ );
}
- /** {@inheritDoc} */
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- double rows = mq.getRowCount(getInput());
-
- // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
- // currently it's OK to have such a dummy cost because there is no other options
- return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
+ /** */
+ public RelOptCost computeSelfCostSort(RelOptPlanner planner, RelMetadataQuery mq) {
+ IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+ double inRows = mq.getRowCount(getInput());
+
+ return costFactory.makeCost(
+ inRows,
+ inRows * IgniteCost.ROW_PASS_THROUGH_COST,
+ 0,
+ estimateMemoryForGroup(mq),
+ 0
+ );
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
similarity index 73%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
index e951b34..2380615 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
@@ -22,22 +22,17 @@ import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -59,29 +54,29 @@ import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUti
/**
*
*/
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
+public abstract class IgniteAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
/** {@inheritDoc} */
- public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- super(cluster, traitSet, ImmutableList.of(), input, groupSet, groupSets, aggCalls);
+ protected IgniteAggregateBase(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls
+ ) {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
}
/** {@inheritDoc} */
- public IgniteAggregate(RelInput input) {
+ protected IgniteAggregateBase(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));
}
/** {@inheritDoc} */
- @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
- return visitor.visit(this);
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
// Distribution propagation is based on next rules:
// 1) Any aggregation is possible on single or broadcast distribution.
// 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
@@ -141,14 +136,10 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
}
/** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Since it's a hash aggregate it erases collation.
- return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
// Aggregate is rewindable if its input is rewindable.
RelTraitSet in = inputTraits.get(0);
@@ -161,7 +152,10 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
// Distribution propagation is based on next rules:
// 1) Any aggregation is possible on single or broadcast distribution.
// 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
@@ -220,16 +214,10 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Since it's a hash aggregate it erases collation.
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits) {
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
inTraits));
}
@@ -247,28 +235,48 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
assert diff.size() == 1 && F.first(diff) == TraitUtils.distribution(outTraits);
}
- RelNode map = new IgniteMapAggregate(getCluster(), in, convert(getInput(), in), groupSet, groupSets, aggCalls);
- return new IgniteReduceAggregate(getCluster(), outTraits, convert(map, outTraits), groupSet, groupSets, aggCalls, getRowType());
+ RelNode map = createMapAggregate(
+ getCluster(),
+ in,
+ convert(getInput(), in),
+ groupSet,
+ groupSets,
+ aggCalls);
+
+ return createReduceAggregate(
+ getCluster(),
+ outTraits,
+ convert(map, outTraits),
+ groupSet,
+ groupSets,
+ aggCalls,
+ getRowType());
}
/** */
+ protected abstract RelNode createReduceAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ ImmutableList<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls,
+ RelDataType rowType
+ );
+
+ /** */
+ protected abstract RelNode createMapAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ ImmutableList<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls
+ );
+
+ /** */
private boolean isMapReduce(RelTraitSet out, RelTraitSet in) {
return TraitUtils.distribution(out).satisfies(single())
&& TraitUtils.distribution(in).satisfies(random());
}
-
- /** {@inheritDoc} */
- @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteAggregate(cluster, getTraitSet(), sole(inputs),
- getGroupSet(), getGroupSets(), getAggCallList());
- }
-
- /** {@inheritDoc} */
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- double rows = mq.getRowCount(getInput());
-
- // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
- // currently it's OK to have such a dummy cost because there is no other options
- return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
new file mode 100644
index 0000000..a0f4613
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+
+/**
+ *
+ */
+public class IgniteHashAggregate extends IgniteAggregateBase {
+ /** {@inheritDoc} */
+ public IgniteHashAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+ }
+
+ /** {@inheritDoc} */
+ public IgniteHashAggregate(RelInput input) {
+ super(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ return new IgniteHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteHashAggregate(cluster, getTraitSet(), sole(inputs),
+ getGroupSet(), getGroupSets(), getAggCallList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ // Since it's a hash aggregate it erases collation.
+ return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+ ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ // Since it's a hash aggregate it erases collation.
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+ ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ return new IgniteMapHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
+ RelDataType rowType) {
+ return new IgniteReduceHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, getRowType());
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return computeSelfCostHash(planner, mq);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
similarity index 60%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
index 808301c..49bdf9e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
+
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
@@ -32,30 +33,39 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
/**
*
*/
-public class IgniteMapAggregate extends Aggregate implements IgniteRel {
- public IgniteMapAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+public class IgniteMapHashAggregate extends IgniteAggregate {
+ /** */
+ public IgniteMapHashAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls
+ ) {
super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
}
/** */
- public IgniteMapAggregate(RelInput input) {
- super(changeTraits(input, IgniteConvention.INSTANCE));
+ public IgniteMapHashAggregate(RelInput input) {
+ super(input);
}
/** {@inheritDoc} */
@Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteMapAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+ return new IgniteMapHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteMapHashAggregate(cluster, getTraitSet(), sole(inputs),
+ getGroupSet(), getGroupSets(), getAggCallList());
}
/** {@inheritDoc} */
@@ -68,37 +78,6 @@ public class IgniteMapAggregate extends Aggregate implements IgniteRel {
return rowType(Commons.typeFactory(getCluster()));
}
- /**
- * @return Map aggregate relational node distribution calculated on the basis of its input and groupingSets.
- * <b>Note</b> that the method returns {@code null} in case the given input cannot be processed in map-reduce
- * style by an aggregate.
- */
- public static IgniteDistribution distribution(IgniteDistribution inputDistr, ImmutableBitSet groupSet,
- List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-
- switch (inputDistr.getType()) {
- case SINGLETON:
- case BROADCAST_DISTRIBUTED:
- return inputDistr;
-
- case RANDOM_DISTRIBUTED:
- case HASH_DISTRIBUTED:
- return random(); // its OK to just erase distribution here
-
- default:
- throw new AssertionError("Unexpected distribution type.");
- }
- }
-
- /** {@inheritDoc} */
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- double rows = mq.getRowCount(getInput());
-
- // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
- // currently it's OK to have such a dummy cost because there is no other options
- return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
- }
-
/** */
public static RelDataType rowType(RelDataTypeFactory typeFactory) {
assert typeFactory instanceof IgniteTypeFactory;
@@ -113,8 +92,7 @@ public class IgniteMapAggregate extends Aggregate implements IgniteRel {
}
/** {@inheritDoc} */
- @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteMapAggregate(cluster, getTraitSet(), sole(inputs),
- getGroupSet(), getGroupSets(), getAggCallList());
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return computeSelfCostHash(planner, mq);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
new file mode 100644
index 0000000..171b747
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ *
+ */
+public class IgniteMapSortAggregate extends IgniteAggregate {
+ /** Collation. */
+ private final RelCollation collation;
+
+ /** */
+ public IgniteMapSortAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls,
+ RelCollation collation
+ ) {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+
+ assert Objects.nonNull(collation);
+ assert !collation.isDefault();
+
+ this.collation = collation;
+ }
+
+ /** */
+ public IgniteMapSortAggregate(RelInput input) {
+ super(changeTraits(input, IgniteConvention.INSTANCE));
+
+ collation = input.getCollation();
+
+ assert Objects.nonNull(collation);
+ assert !collation.isDefault();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Aggregate copy(
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls) {
+ return new IgniteMapSortAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteMapSortAggregate(
+ cluster,
+ getTraitSet(),
+ sole(inputs),
+ getGroupSet(),
+ getGroupSets(),
+ getAggCallList(),
+ collation
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("collation", collation);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected RelDataType deriveRowType() {
+ RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
+
+ RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+ groupSet.forEach(fieldIdx -> {
+ RelDataTypeField fld = input.getRowType().getFieldList().get(fieldIdx);
+
+ builder.add(fld);
+ });
+
+ builder.add("AGG_DATA", typeFactory.createArrayType(typeFactory.createJavaType(Accumulator.class), -1));
+
+ return builder.build();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return computeSelfCostSort(planner, mq);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelCollation collation() {
+ return collation;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
similarity index 61%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
index 1e4f88b..cfab999 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
@@ -18,12 +18,10 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
+
import com.google.common.collect.ImmutableList;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
@@ -31,34 +29,37 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.core.Aggregate.Group;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
*/
-public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
+public abstract class IgniteReduceAggregateBase extends SingleRel implements IgniteRel {
/** */
- private final ImmutableBitSet groupSet;
+ protected final ImmutableBitSet groupSet;
/** */
- private final List<ImmutableBitSet> groupSets;
+ protected final List<ImmutableBitSet> groupSets;
/** */
- private final List<AggregateCall> aggCalls;
+ protected final List<AggregateCall> aggCalls;
/** */
- public IgniteReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, RelDataType rowType) {
+ protected IgniteReduceAggregateBase(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls,
+ RelDataType rowType
+ ) {
super(cluster, traits, input);
assert rowType != null;
- assert RelOptUtil.areRowTypesEqual(input.getRowType(),
- IgniteMapAggregate.rowType(Commons.typeFactory(cluster)), true);
this.groupSet = groupSet;
if (groupSets == null)
groupSets = ImmutableList.of(groupSet);
@@ -68,7 +69,7 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
}
/** */
- public IgniteReduceAggregate(RelInput input) {
+ protected IgniteReduceAggregateBase(RelInput input) {
this(
input.getCluster(),
input.getTraitSet().replace(IgniteConvention.INSTANCE),
@@ -85,16 +86,6 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
}
/** {@inheritDoc} */
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new IgniteReduceAggregate(getCluster(), traitSet, sole(inputs), groupSet, groupSets, aggCalls, rowType);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
- return visitor.visit(this);
- }
-
- /** {@inheritDoc} */
@Override public RelWriter explainTerms(RelWriter pw) {
super.explainTerms(pw)
.itemIf("rowType", rowType, pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES)
@@ -110,15 +101,6 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
return pw;
}
- /** {@inheritDoc} */
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- double rows = mq.getRowCount(getInput());
-
- // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
- // currently it's OK to have such a dummy cost because there is no other options
- return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
- }
-
/** */
public ImmutableBitSet groupSet() {
return groupSet;
@@ -133,10 +115,4 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
public List<AggregateCall> aggregateCalls() {
return aggCalls;
}
-
- /** {@inheritDoc} */
- @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteReduceAggregate(cluster, getTraitSet(), sole(inputs),
- groupSet, groupSets, aggCalls, rowType);
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
new file mode 100644
index 0000000..dcda3ac
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ *
+ */
+public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase {
+ /** */
+ public IgniteReduceHashAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls,
+ RelDataType rowType
+ ) {
+ super(cluster, traits, input, groupSet, groupSets, aggCalls, rowType);
+
+ assert RelOptUtil.areRowTypesEqual(input.getRowType(),
+ IgniteMapHashAggregate.rowType(Commons.typeFactory(cluster)), true);
+ }
+
+ /** */
+ public IgniteReduceHashAggregate(RelInput input) {
+ super(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new IgniteReduceHashAggregate(getCluster(), traitSet, sole(inputs), groupSet, groupSets, aggCalls, rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteReduceHashAggregate(cluster, getTraitSet(), sole(inputs),
+ groupSet, groupSets, aggCalls, rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+ double rows = mq.getRowCount(getInput());
+
+ double mem = 0d;
+ if (aggCalls.isEmpty())
+ mem = groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;
+ else {
+ for (AggregateCall aggCall : aggCalls) {
+ if (aggCall.isDistinct())
+ mem += IgniteCost.AGG_CALL_MEM_COST * rows;
+ else
+ mem += IgniteCost.AGG_CALL_MEM_COST;
+ }
+ }
+
+ return costFactory.makeCost(
+ rows,
+ rows * IgniteCost.ROW_PASS_THROUGH_COST,
+ 0,
+ mem,
+ 0
+ );
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
new file mode 100644
index 0000000..bb71bf1
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+
+/**
+ *
+ */
+public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
+ /** Collation. */
+ private final RelCollation collation;
+
+ /** */
+ public IgniteReduceSortAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls,
+ RelDataType rowType,
+ RelCollation collation
+ ) {
+ super(cluster, traits, input, groupSet, groupSets, aggCalls, rowType);
+
+ assert Objects.nonNull(collation);
+ assert !collation.isDefault();
+
+ this.collation = collation;
+ }
+
+ /** */
+ public IgniteReduceSortAggregate(RelInput input) {
+ super(input);
+ collation = input.getCollation();
+
+ assert Objects.nonNull(collation);
+ assert !collation.isDefault();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new IgniteReduceSortAggregate(
+ getCluster(),
+ traitSet,
+ sole(inputs),
+ groupSet,
+ groupSets,
+ aggCalls,
+ rowType,
+ collation
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteReduceSortAggregate(
+ cluster,
+ getTraitSet(),
+ sole(inputs),
+ groupSet,
+ groupSets,
+ aggCalls,
+ rowType,
+ collation
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("collation", collation);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+ double rows = mq.getRowCount(getInput());
+
+ return costFactory.makeCost(
+ rows,
+ rows * IgniteCost.ROW_PASS_THROUGH_COST,
+ 0
+ );
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 2db2cc1..8c620a6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -79,17 +79,32 @@ public interface IgniteRelVisitor<T> {
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
- T visit(IgniteAggregate rel);
+ T visit(IgniteHashAggregate rel);
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
- T visit(IgniteMapAggregate rel);
+ T visit(IgniteMapHashAggregate rel);
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
- T visit(IgniteReduceAggregate rel);
+ T visit(IgniteReduceHashAggregate rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
+ T visit(IgniteSortAggregate rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
+ T visit(IgniteMapSortAggregate rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
+ T visit(IgniteReduceSortAggregate rel);
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
new file mode 100644
index 0000000..0a2e3e5
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+
+/**
+ *
+ */
+public class IgniteSortAggregate extends IgniteAggregateBase {
+ /** Collation. */
+ private final RelCollation collation;
+
+ /** {@inheritDoc} */
+ public IgniteSortAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls
+ ) {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+
+ assert !TraitUtils.collation(traitSet).isDefault();
+ assert !groupSet.isEmpty() && groupSets.size() == 1;
+
+ collation = TraitUtils.collation(traitSet);
+ }
+
+ /** {@inheritDoc} */
+ public IgniteSortAggregate(RelInput input) {
+ super(input);
+
+ collation = input.getCollation();
+
+ assert Objects.nonNull(collation);
+ assert !collation.isDefault();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ return new IgniteSortAggregate(getCluster(), traitSet.replace(collation), input, groupSet, groupSets, aggCalls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteSortAggregate(cluster, getTraitSet(), sole(inputs),
+ getGroupSet(), getGroupSets(), getAggCallList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("collation", collation);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
+ RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+ ) {
+ RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(groupSet.asList()));
+
+ return Pair.of(nodeTraits.replace(collation),
+ ImmutableList.of(inputTraits.get(0).replace(collation)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
+ RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+ ) {
+ RelCollation inputCollation = TraitUtils.collation(inputTraits.get(0));
+
+ List<RelCollation> satisfiedCollations = TraitUtils.permute(groupSet.asList()).stream()
+ .filter(col -> inputCollation.satisfies(col))
+ .collect(Collectors.toList());
+
+ if (satisfiedCollations.isEmpty())
+ return ImmutableList.of();
+
+ return satisfiedCollations.stream()
+ .map(col -> Pair.of(nodeTraits.replace(col), inputTraits))
+ .collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ return new IgniteMapSortAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, collation);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
+ RelDataType rowType) {
+ return new IgniteReduceSortAggregate(getCluster(), traits, input, groupSet, groupSets,
+ aggCalls, getRowType(), collation);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelCollation collation() {
+ assert collation.equals(super.collation());
+
+ return collation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return computeSelfCostSort(planner, mq);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
similarity index 83%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
index 0955517..b1cf2e2a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
@@ -25,19 +25,19 @@ import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
/**
*
*/
-public class AggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+public class HashAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
/** */
- public static final RelOptRule INSTANCE = new AggregateConverterRule();
+ public static final RelOptRule INSTANCE = new HashAggregateConverterRule();
/** */
- public AggregateConverterRule() {
- super(LogicalAggregate.class, "AggregateConverterRule");
+ public HashAggregateConverterRule() {
+ super(LogicalAggregate.class, "HashAggregateConverterRule");
}
/** {@inheritDoc} */
@@ -48,7 +48,7 @@ public class AggregateConverterRule extends AbstractIgniteConverterRule<LogicalA
RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
RelNode input = convert(rel.getInput(), inTrait);
- return new IgniteAggregate(cluster, outTrait, input,
+ return new IgniteHashAggregate(cluster, outTrait, input,
rel.getGroupSet(), rel.getGroupSets(), rel.getAggCallList());
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
similarity index 62%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
index 0955517..a4dc10a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
@@ -22,33 +22,47 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.util.typedef.F;
-/**
- *
- */
-public class AggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+/** */
+public class SortAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
/** */
- public static final RelOptRule INSTANCE = new AggregateConverterRule();
+ public static final RelOptRule INSTANCE = new SortAggregateConverterRule();
/** */
- public AggregateConverterRule() {
- super(LogicalAggregate.class, "AggregateConverterRule");
+ public SortAggregateConverterRule() {
+ super(LogicalAggregate.class, "SortAggregateConverterRule");
}
/** {@inheritDoc} */
@Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalAggregate rel) {
+ // Applicable only for GROUP BY
+ if (F.isEmpty(rel.getGroupSet()) || rel.getGroupSets().size() > 1)
+ return null;
+
RelOptCluster cluster = rel.getCluster();
RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
- RelNode input = convert(rel.getInput(), inTrait);
+ RelNode input = rel.getInput();
+
+ RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(rel.getGroupSet().asList()));
- return new IgniteAggregate(cluster, outTrait, input,
- rel.getGroupSet(), rel.getGroupSets(), rel.getAggCallList());
+ return new IgniteSortAggregate(
+ cluster,
+ outTrait.replace(collation),
+ convert(input, inTrait.replace(collation)),
+ rel.getGroupSet(),
+ rel.getGroupSets(),
+ rel.getAggCallList()
+ );
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
index 9183c62..cd85348 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.rule;
import java.util.List;
+
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
@@ -28,8 +29,8 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -54,7 +55,7 @@ public class UnionConverterRule extends AbstractIgniteConverterRule<LogicalUnion
PhysicalNode res = new IgniteUnionAll(cluster, traits, inputs);
if (!rel.all)
- res = new IgniteAggregate(cluster, traits, res,
+ res = new IgniteHashAggregate(cluster, traits, res,
ImmutableBitSet.range(rel.getRowType().getFieldCount()), null, ImmutableList.of());
return res;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 3856c83..b128566 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -45,6 +47,7 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ControlFlowException;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
@@ -406,6 +409,43 @@ public class TraitUtils {
}
/**
+ * Creates collations list with all permutation of specified keys.
+ *
+ * @param keys The keys to create collation from.
+ * @return New collation.
+ */
+ public static List<RelCollation> permute(List<Integer> keys) {
+ keys = new ArrayList<>(keys);
+
+ List<RelCollation> res = new ArrayList<>();
+
+ int[] indexes = new int[keys.size()];
+ Arrays.fill(indexes, 0);
+
+ res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
+
+ int i = 0;
+
+ while (i < keys.size()) {
+ if (indexes[i] < i) {
+ Collections.swap(keys, i % 2 == 0 ? 0 : indexes[i], i);
+
+ res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
+
+ indexes[i]++;
+ i = 0;
+ }
+ else {
+ indexes[i] = 0;
+ i++;
+ }
+ }
+
+ return res;
+ }
+
+
+ /**
* @param elem Elem.
*/
private static <T> List<T> singletonListFromNullable(@Nullable T elem) {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
index 439c06e..5a0dfe8 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
@@ -42,6 +42,10 @@ public class AggregatesIntegrationTest extends AbstractBasicIntegrationTest {
assertQuery("select count(*) from person where salary > 10").returns(2L).check();
assertQuery("select count(1) from person where salary > 10").returns(2L).check();
+ assertQuery("select count(name) filter (where salary > 10) from person").returns(1L).check();
+ assertQuery("select count(*) filter (where salary > 10) from person").returns(2L).check();
+ assertQuery("select count(1) filter (where salary > 10) from person").returns(2L).check();
+
assertQuery("select salary, count(name) from person group by salary order by salary")
.returns(10d, 3L)
.returns(15d, 1L)
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
new file mode 100644
index 0000000..718b2e2
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Sort aggregate integration test.
+ */
+public class SortAggregateIntegrationTest extends GridCommonAbstractTest {
+ /** */
+ public static final int ROWS = 103;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ fillCache(grid(0).cache("TEST"), ROWS);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ grid(0).cache("TEST").clear();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ QueryEntity part = new QueryEntity()
+ .setTableName("TEST")
+ .setKeyType(Integer.class.getName())
+ .setValueType(TestVal.class.getName())
+ .setKeyFieldName("ID")
+ .addQueryField("ID", Integer.class.getName(), null)
+ .addQueryField("GRP0", Integer.class.getName(), null)
+ .addQueryField("GRP1", Integer.class.getName(), null)
+ .addQueryField("VAL0", Integer.class.getName(), null)
+ .addQueryField("VAL1", Integer.class.getName(), null)
+ .setIndexes(Collections.singletonList(new QueryIndex(Arrays.asList("GRP0", "GRP1"), QueryIndexType.SORTED)));
+
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(
+ new CacheConfiguration<>(part.getTableName())
+ .setAffinity(new RendezvousAffinityFunction(false, 8))
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setQueryEntities(singletonList(part))
+ .setSqlSchema("PUBLIC")
+ );
+ }
+
+ /** */
+ @Test
+ public void mapReduceAggregate() {
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> cursors = engine.query(
+ null,
+ "PUBLIC",
+ "SELECT /*+ DISABLE_RULE('HashAggregateConverterRule') */" +
+ "SUM(val0), SUM(val1), grp0 FROM TEST " +
+ "GROUP BY grp0 " +
+ "HAVING SUM(val1) > 10",
+ X.EMPTY_OBJECT_ARRAY
+ );
+
+ List<List<?>> res = cursors.get(0).getAll();
+
+ assertEquals(ROWS / 10, res.size());
+
+ res.forEach(r -> {
+ Integer s0 = (Integer)r.get(0);
+ Integer s1 = (Integer)r.get(1);
+
+ assertEquals(s0 * 2, (int)s1);
+ });
+ }
+
+ /**
+ * @param c Cache.
+ * @param rows Rows count.
+ */
+ private void fillCache(IgniteCache c, int rows) throws InterruptedException {
+ c.clear();
+
+ for (int i = 0; i < rows; ++i)
+ c.put(i, new TestVal(i));
+
+ awaitPartitionMapExchange();
+ }
+
+ /** */
+ public static class TestVal {
+ /** */
+ int grp0;
+
+ /** */
+ int grp1;
+
+ /** */
+ int val0;
+
+ /** */
+ int val1;
+
+ /** */
+ TestVal(int k) {
+ grp0 = k / 10;
+ grp1 = k / 100;
+
+ val0 = 1;
+ val1 = 2;
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 02bc083..a490e6c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -292,6 +292,11 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
}
/** */
+ protected Object[] row(Object... fields) {
+ return fields;
+ }
+
+ /** */
private static class TestMessageServiceImpl extends MessageServiceImpl {
/** */
private final TestIoManager mgr;
@@ -338,24 +343,38 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
/** */
TestTable(int rowsCnt, RelDataType rowType) {
+ this(
+ rowsCnt,
+ rowType,
+ rowType.getFieldList().stream()
+ .map((Function<RelDataTypeField, Function<Integer, Object>>)(t) -> {
+ switch (t.getType().getSqlTypeName().getFamily()) {
+ case NUMERIC:
+ return TestTable::intField;
+
+ case CHARACTER:
+ return TestTable::stringField;
+
+ default:
+ assert false : "Not supported type for test: " + t;
+ return null;
+ }
+ })
+ .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()])
+ );
+ }
+
+ /** */
+ TestTable(int rowsCnt, RelDataType rowType, Function<Integer, Object>... fieldCreators) {
this.rowsCnt = rowsCnt;
this.rowType = rowType;
+ this.fieldCreators = fieldCreators;
+ }
- fieldCreators = rowType.getFieldList().stream()
- .map((Function<RelDataTypeField, Function<Integer, Object>>) (t) -> {
- switch (t.getType().getSqlTypeName().getFamily()) {
- case NUMERIC:
- return TestTable::intField;
-
- case CHARACTER:
- return TestTable::stringField;
- default:
- assert false : "Not supported type for test: " + t;
- return null;
- }
- })
- .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()]);
+ /** */
+ private static Object field(Integer rowNum) {
+ return "val_" + rowNum;
}
/** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
new file mode 100644
index 0000000..4ef09e6
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public abstract class BaseAggregateTest extends AbstractExecutionTest {
+ /** Last parameter number. */
+ protected static final int TEST_AGG_PARAM_NUM = LAST_PARAM_NUM + 1;
+
+ /** */
+ @Parameterized.Parameter(TEST_AGG_PARAM_NUM)
+ public TestAggregateType testAgg;
+
+ /** */
+ @Parameterized.Parameters(name = PARAMS_STRING + ", type={" + TEST_AGG_PARAM_NUM + "}")
+ public static List<Object[]> data() {
+ List<Object[]> extraParams = new ArrayList<>();
+
+ ImmutableList<Object[]> newParams = ImmutableList.of(
+ new Object[] {TestAggregateType.SINGLE},
+ new Object[] {TestAggregateType.MAP_REDUCE}
+ );
+
+ for (Object[] newParam : newParams) {
+ for (Object[] inheritedParam : AbstractExecutionTest.parameters()) {
+ Object[] both = Stream.concat(Arrays.stream(inheritedParam), Arrays.stream(newParam))
+ .toArray(Object[]::new);
+
+ extraParams.add(both);
+ }
+ }
+
+ return extraParams;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Before
+ @Override public void setup() throws Exception {
+ nodesCnt = 1;
+ super.setup();
+ }
+
+ /** */
+ @Test
+ public void count() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row(0, 200),
+ row(1, 300),
+ row(1, 1400),
+ row(0, 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.COUNT,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+ RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+ SingleNode<Object[]> aggChain = createAggregateNodesChain(
+ ctx,
+ grpSets,
+ call,
+ rowType,
+ aggRowType,
+ rowFactory(),
+ scan
+ );
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+ root.register(aggChain);
+
+ assertTrue(root.hasNext());
+
+ Assert.assertArrayEquals(row(0, 2), root.next());
+ Assert.assertArrayEquals(row(1, 2), root.next());
+
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void min() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row(0, 200),
+ row(1, 300),
+ row(1, 1400),
+ row(0, 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.MIN,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+ RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+ SingleNode<Object[]> aggChain = createAggregateNodesChain(
+ ctx,
+ grpSets,
+ call,
+ rowType,
+ aggRowType,
+ rowFactory(),
+ scan
+ );
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+ root.register(aggChain);
+
+ assertTrue(root.hasNext());
+
+ Assert.assertArrayEquals(row(0, 200), root.next());
+ Assert.assertArrayEquals(row(1, 300), root.next());
+
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void max() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row(0, 200),
+ row(1, 300),
+ row(1, 1400),
+ row(0, 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.MAX,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+ RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+ SingleNode<Object[]> aggChain = createAggregateNodesChain(
+ ctx,
+ grpSets,
+ call,
+ rowType,
+ aggRowType,
+ rowFactory(),
+ scan
+ );
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+ root.register(aggChain);
+
+ assertTrue(root.hasNext());
+
+ Assert.assertArrayEquals(row(0, 1000), root.next());
+ Assert.assertArrayEquals(row(1, 1400), root.next());
+
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void avg() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row(0, 200),
+ row(1, 300),
+ row(1, 1300),
+ row(0, 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.AVG,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+ RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+ SingleNode<Object[]> aggChain = createAggregateNodesChain(
+ ctx,
+ grpSets,
+ call,
+ rowType,
+ aggRowType,
+ rowFactory(),
+ scan
+ );
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+ root.register(aggChain);
+
+ assertTrue(root.hasNext());
+
+ Assert.assertArrayEquals(row(0, 600), root.next());
+ Assert.assertArrayEquals(row(1, 800), root.next());
+
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void sumOnDifferentRowsCount() throws IgniteCheckedException {
+ int bufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+ int[] grpsCount = {1, bufSize / 2, bufSize, bufSize + 1, bufSize * 4};
+ int[] rowsInGroups = {1, 5, bufSize};
+
+ for (int grps : grpsCount) {
+ for (int rowsInGroup : rowsInGroups) {
+ log.info("Check: [grps=" + grps + ", rowsInGroup=" + rowsInGroup + ']');
+
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+
+ ScanNode<Object[]> scan = new ScanNode<>(
+ ctx,
+ rowType,
+ new TestTable(
+ grps * rowsInGroup,
+ rowType,
+ (r) -> r / rowsInGroup,
+ (r) -> r % rowsInGroup
+ )
+ );
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.SUM,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+ RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+ SingleNode<Object[]> aggChain = createAggregateNodesChain(
+ ctx,
+ grpSets,
+ call,
+ rowType,
+ aggRowType,
+ rowFactory(),
+ scan
+ );
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+ root.register(aggChain);
+
+ Set<Integer> grpId = IntStream.range(0, grps).boxed().collect(Collectors.toSet());
+
+ while (root.hasNext()) {
+ Object[] row = root.next();
+
+ grpId.remove(row[0]);
+
+ assertEquals((rowsInGroup - 1) * rowsInGroup / 2, row[1]);
+ }
+
+ assertTrue(grpId.isEmpty());
+ }
+ }
+ }
+
+
+ /** */
+ private SingleNode<Object[]> createAggregateNodesChain(
+ ExecutionContext<Object[]> ctx,
+ ImmutableList<ImmutableBitSet> grpSets,
+ AggregateCall aggCall,
+ RelDataType inRowType,
+ RelDataType aggRowType,
+ RowHandler.RowFactory<Object[]> rowFactory,
+ ScanNode<Object[]> scan
+ ) {
+ switch (testAgg) {
+ case SINGLE:
+ return createSingleAggregateNodesChain(ctx, grpSets, aggCall, inRowType, aggRowType, rowFactory, scan);
+
+ case MAP_REDUCE:
+ return createMapReduceAggregateNodesChain(ctx, grpSets, aggCall, inRowType, aggRowType, rowFactory, scan);
+
+ default:
+ assert false;
+
+ return null;
+ }
+ }
+
+ /** */
+ protected abstract SingleNode<Object[]> createSingleAggregateNodesChain(
+ ExecutionContext<Object[]> ctx,
+ ImmutableList<ImmutableBitSet> grpSets,
+ AggregateCall aggCall,
+ RelDataType inRowType,
+ RelDataType aggRowType,
+ RowHandler.RowFactory<Object[]> rowFactory,
+ ScanNode<Object[]> scan
+ );
+
+ /** */
+ protected abstract SingleNode<Object[]> createMapReduceAggregateNodesChain(
+ ExecutionContext<Object[]> ctx,
+ ImmutableList<ImmutableBitSet> grpSets,
+ AggregateCall call,
+ RelDataType inRowType,
+ RelDataType aggRowType,
+ RowHandler.RowFactory<Object[]> rowFactory,
+ ScanNode<Object[]> scan
+ );
+
+ /** */
+ protected Supplier<List<AccumulatorWrapper<Object[]>>> accFactory(
+ ExecutionContext<Object[]> ctx,
+ AggregateCall call,
+ AggregateType type,
+ RelDataType rowType
+ ) {
+ return ctx.expressionFactory().accumulatorsFactory(type, F.asList(call), rowType);
+ }
+
+ /** */
+ protected RowHandler.RowFactory<Object[]> rowFactory() {
+ return new RowHandler.RowFactory<Object[]>() {
+ /** */
+ @Override public RowHandler<Object[]> handler() {
+ return ArrayRowHandler.INSTANCE;
+ }
+
+ /** */
+ @Override public Object[] create() {
+ throw new AssertionError();
+ }
+
+ /** */
+ @Override public Object[] create(Object... fields) {
+ return fields;
+ }
+ };
+ }
+
+ /** */
+ enum TestAggregateType {
+ /** */
+ SINGLE,
+
+ /** */
+ MAP_REDUCE
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 4ba60cb..bf2cff0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -21,25 +21,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import java.util.function.Supplier;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -58,9 +46,6 @@ import static org.apache.calcite.rel.core.JoinRelType.INNER;
import static org.apache.calcite.rel.core.JoinRelType.LEFT;
import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
import static org.apache.calcite.rel.core.JoinRelType.SEMI;
-import static org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType.MAP;
-import static org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType.REDUCE;
-import static org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType.SINGLE;
/**
*
@@ -468,468 +453,6 @@ public class ExecutionTest extends AbstractExecutionTest {
*
*/
@Test
- public void testAggregateMapReduceAvg() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.AVG,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(double.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType mapType = IgniteMapAggregate.rowType(tf);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
- map.register(scan);
-
- RelDataType reduceType = TypeUtils.createRowType(tf, double.class);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
- reduce.register(map);
-
- RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
- root.register(reduce);
-
- assertTrue(root.hasNext());
- assertEquals(725d, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateMapReduceSum() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.SUM,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType mapType = IgniteMapAggregate.rowType(tf);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
- map.register(scan);
-
- RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
- reduce.register(map);
-
- RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
- root.register(reduce);
-
- assertTrue(root.hasNext());
- assertEquals(2900, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateMapReduceMin() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.MIN,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType mapType = IgniteMapAggregate.rowType(tf);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
- map.register(scan);
-
- RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
- reduce.register(map);
-
- RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
- root.register(reduce);
-
- assertTrue(root.hasNext());
- assertEquals(200, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateMapReduceMax() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.MAX,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType mapType = IgniteMapAggregate.rowType(tf);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
- map.register(scan);
-
- RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
- reduce.register(map);
-
- RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
- root.register(reduce);
-
- assertTrue(root.hasNext());
- assertEquals(1400, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateMapReduceCount() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.COUNT,
- false,
- false,
- false,
- ImmutableIntList.of(),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType mapType = IgniteMapAggregate.rowType(tf);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
- map.register(scan);
-
- RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
- reduce.register(map);
-
- RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
- root.register(reduce);
-
- assertTrue(root.hasNext());
- assertEquals(4, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateAvg() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.AVG,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(double.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType aggType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
- agg.register(scan);
-
- RootNode<Object[]> root = new RootNode<>(ctx, aggType);
- root.register(agg);
-
- assertTrue(root.hasNext());
- assertEquals(725d, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateSum() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.SUM,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType aggType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
- agg.register(scan);
-
- RootNode<Object[]> root = new RootNode<>(ctx, aggType);
- root.register(agg);
-
- assertTrue(root.hasNext());
- assertEquals(2900, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateMin() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.MIN,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType aggType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
- agg.register(scan);
-
- RootNode<Object[]> root = new RootNode<>(ctx, aggType);
- root.register(agg);
-
- assertTrue(root.hasNext());
- assertEquals(200, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateMax() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.MAX,
- false,
- false,
- false,
- ImmutableIntList.of(1),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType aggType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
- agg.register(scan);
-
- RootNode<Object[]> root = new RootNode<>(ctx, aggType);
- root.register(agg);
-
- assertTrue(root.hasNext());
- assertEquals(1400, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateCount() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 200),
- row("Roman", 300),
- row("Ivan", 1400),
- row("Alexey", 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.COUNT,
- false,
- false,
- false,
- ImmutableIntList.of(),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
- RelDataType aggType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
- agg.register(scan);
-
- RootNode<Object[]> root = new RootNode<>(ctx, aggType);
- root.register(agg);
-
- assertTrue(root.hasNext());
- assertEquals(4, root.next()[0]);
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
- public void testAggregateCountByGroup() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class, int.class);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 0, 200),
- row("Roman", 1, 300),
- row("Ivan", 1, 1400),
- row("Alexey", 0, 1000)
- ));
-
- AggregateCall call = AggregateCall.create(
- SqlStdOperatorTable.COUNT,
- false,
- false,
- false,
- ImmutableIntList.of(),
- -1,
- RelCollations.EMPTY,
- tf.createJavaType(int.class),
- null);
-
- ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(1));
-
- RelDataType aggType = TypeUtils.createRowType(tf, int.class);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
- agg.register(scan);
-
- RootNode<Object[]> root = new RootNode<>(ctx, aggType);
- root.register(agg);
-
- assertTrue(root.hasNext());
- // TODO needs a sort, relying on an order in a hash table looks strange
- Assert.assertArrayEquals(row(1, 2), root.next());
- Assert.assertArrayEquals(row(0, 2), root.next());
- assertFalse(root.hasNext());
- }
-
- /**
- *
- */
- @Test
public void testCorrelatedNestedLoopJoin() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
IgniteTypeFactory tf = ctx.getTypeFactory();
@@ -1049,7 +572,7 @@ public class ExecutionTest extends AbstractExecutionTest {
/**
* Test verifies that an AssertionError thrown from an execution node
- * proprely handled by a task executor.
+ * properly handled by a task executor.
*/
@Test
@SuppressWarnings("ThrowableNotThrown")
@@ -1084,41 +607,11 @@ public class ExecutionTest extends AbstractExecutionTest {
/**
*
*/
- private Object[] row(Object... fields) {
+ protected Object[] row(Object... fields) {
return fields;
}
/**
- *
- */
- private Supplier<List<AccumulatorWrapper<Object[]>>> accFactory(ExecutionContext<Object[]> ctx, AggregateCall call,
- AggregateNode.AggregateType type, RelDataType rowType) {
- return ctx.expressionFactory().accumulatorsFactory(type, F.asList(call), rowType);
- }
-
- /**
- *
- */
- private RowFactory<Object[]> rowFactory() {
- return new RowFactory<Object[]>() {
- /** */
- @Override public RowHandler<Object[]> handler() {
- return ArrayRowHandler.INSTANCE;
- }
-
- /** */
- @Override public Object[] create() {
- throw new AssertionError();
- }
-
- /** */
- @Override public Object[] create(Object... fields) {
- return fields;
- }
- };
- }
-
- /**
* Node that always throws assertion error except for {@link #close()}
* and {@link #onRegister(Downstream)} methods.
*/
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateExecutionTest.java
new file mode 100644
index 0000000..a475e00
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateExecutionTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Comparator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class HashAggregateExecutionTest extends BaseAggregateTest {
+ /** {@inheritDoc} */
+ @Override protected SingleNode<Object[]> createSingleAggregateNodesChain(
+ ExecutionContext<Object[]> ctx,
+ ImmutableList<ImmutableBitSet> grpSets,
+ AggregateCall call,
+ RelDataType inRowType,
+ RelDataType aggRowType,
+ RowHandler.RowFactory<Object[]> rowFactory,
+ ScanNode<Object[]> scan
+ ) {
+ assert grpSets.size() == 1 : "Test checks only simple GROUP BY";
+
+ HashAggregateNode<Object[]> agg = new HashAggregateNode<>(
+ ctx,
+ aggRowType,
+ SINGLE,
+ grpSets,
+ accFactory(ctx, call, SINGLE, inRowType),
+ rowFactory
+ );
+
+ agg.register(scan);
+
+ // Collation of the first fields emulates planner behavior:
+ // The group's keys placed on the begin of the output row.
+ RelCollation collation = RelCollations.of(
+ ImmutableIntList.copyOf(
+ IntStream.range(0, F.first(grpSets).cardinality()).boxed().collect(Collectors.toList())
+ )
+ );
+
+ Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+ // Create sort node on the top to check sorted results
+ SortNode<Object[]> sort = new SortNode<>(ctx, inRowType, cmp);
+
+ sort.register(agg);
+
+ return sort;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected SingleNode<Object[]> createMapReduceAggregateNodesChain(
+ ExecutionContext<Object[]> ctx,
+ ImmutableList<ImmutableBitSet> grpSets,
+ AggregateCall call,
+ RelDataType inRowType,
+ RelDataType aggRowType,
+ RowHandler.RowFactory<Object[]> rowFactory,
+ ScanNode<Object[]> scan
+ ) {
+ assert grpSets.size() == 1 : "Test checks only simple GROUP BY";
+
+ HashAggregateNode<Object[]> aggMap = new HashAggregateNode<>(
+ ctx,
+ aggRowType,
+ MAP,
+ grpSets,
+ accFactory(ctx, call, MAP, inRowType),
+ rowFactory
+ );
+
+ aggMap.register(scan);
+
+ HashAggregateNode<Object[]> aggRdc = new HashAggregateNode<>(
+ ctx,
+ aggRowType,
+ REDUCE,
+ grpSets,
+ accFactory(ctx, call, REDUCE, aggRowType),
+ rowFactory
+ );
+
+ aggRdc.register(aggMap);
+
+ // Collation of the first fields emulates planner behavior:
+ // The group's keys placed on the begin of the output row.
+ RelCollation collation = RelCollations.of(
+ ImmutableIntList.copyOf(
+ IntStream.range(0, F.first(grpSets).cardinality()).boxed().collect(Collectors.toList())
+ )
+ );
+
+ Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+ // Create sort node on the top to check sorted results
+ SortNode<Object[]> sort = new SortNode<>(ctx, aggRowType, cmp);
+
+ sort.register(aggRdc);
+
+ return sort;
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
new file mode 100644
index 0000000..5f99aa3
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Before
+ @Override public void setup() throws Exception {
+ nodesCnt = 1;
+ super.setup();
+ }
+
+ /** */
+ @Test
+ public void mapReduceAvg() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.AVG,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(double.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+ HashAggregateNode<Object[]> map = new HashAggregateNode<>(
+ ctx,
+ mapType,
+ MAP,
+ grpSets,
+ accFactory(ctx, call, MAP, rowType),
+ rowFactory()
+ );
+
+ map.register(scan);
+
+ RelDataType reduceType = TypeUtils.createRowType(tf, double.class);
+ HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(
+ ctx,
+ reduceType,
+ REDUCE,
+ grpSets,
+ accFactory(ctx, call, REDUCE, null),
+ rowFactory()
+ );
+
+ reduce.register(map);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+ root.register(reduce);
+
+ assertTrue(root.hasNext());
+ assertEquals(725d, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void mapReduceSum() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.SUM,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+ HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+ map.register(scan);
+
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+ reduce.register(map);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+ root.register(reduce);
+
+ assertTrue(root.hasNext());
+ assertEquals(2900, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void mapReduceMin() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.MIN,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+ HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+ map.register(scan);
+
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+ reduce.register(map);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+ root.register(reduce);
+
+ assertTrue(root.hasNext());
+ assertEquals(200, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void mapReduceMax() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.MAX,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+ HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+ map.register(scan);
+
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+ reduce.register(map);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+ root.register(reduce);
+
+ assertTrue(root.hasNext());
+ assertEquals(1400, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void mapReduceCount() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.COUNT,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+ HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+ map.register(scan);
+
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+ reduce.register(map);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+ root.register(reduce);
+
+ assertTrue(root.hasNext());
+ assertEquals(4, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void singleAvg() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.AVG,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(double.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ agg.register(scan);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+ root.register(agg);
+
+ assertTrue(root.hasNext());
+ assertEquals(725d, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void singleSum() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.SUM,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ agg.register(scan);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+ root.register(agg);
+
+ assertTrue(root.hasNext());
+ assertEquals(2900, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void singleMin() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.MIN,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ agg.register(scan);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+ root.register(agg);
+
+ assertTrue(root.hasNext());
+ assertEquals(200, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ @Test
+ public void singleMax() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.MAX,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(1),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ agg.register(scan);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+ root.register(agg);
+
+ assertTrue(root.hasNext());
+ assertEquals(1400, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+
+ /** */
+ @Test
+ public void singleCount() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 200),
+ row("Roman", 300),
+ row("Ivan", 1400),
+ row("Alexey", 1000)
+ ));
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.COUNT,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(),
+ -1,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null);
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ HashAggregateNode<Object[]> agg = new HashAggregateNode<>(
+ ctx,
+ aggType,
+ SINGLE,
+ grpSets,
+ accFactory(ctx, call, SINGLE, rowType),
+ rowFactory()
+ );
+
+ agg.register(scan);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+ root.register(agg);
+
+ assertTrue(root.hasNext());
+ assertEquals(4, root.next()[0]);
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ protected Supplier<List<AccumulatorWrapper<Object[]>>> accFactory(
+ ExecutionContext<Object[]> ctx,
+ AggregateCall call,
+ AggregateType type,
+ RelDataType rowType
+ ) {
+ return ctx.expressionFactory().accumulatorsFactory(type, F.asList(call), rowType);
+ }
+
+ /** */
+ protected RowHandler.RowFactory<Object[]> rowFactory() {
+ return new RowHandler.RowFactory<Object[]>() {
+ /** */
+ @Override public RowHandler<Object[]> handler() {
+ return ArrayRowHandler.INSTANCE;
+ }
+
+ /** */
+ @Override public Object[] create() {
+ throw new AssertionError();
+ }
+
+ /** */
+ @Override public Object[] create(Object... fields) {
+ return fields;
+ }
+ };
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateExecutionTest.java
new file mode 100644
index 0000000..a1a55af
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateExecutionTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Comparator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class SortAggregateExecutionTest extends BaseAggregateTest {
+ /** {@inheritDoc} */
+ @Override protected SingleNode<Object[]> createSingleAggregateNodesChain(
+ ExecutionContext<Object[]> ctx,
+ ImmutableList<ImmutableBitSet> grpSets,
+ AggregateCall call,
+ RelDataType inRowType,
+ RelDataType aggRowType,
+ RowHandler.RowFactory<Object[]> rowFactory,
+ ScanNode<Object[]> scan
+ ) {
+ assert grpSets.size() == 1;
+
+ ImmutableBitSet grpSet = F.first(grpSets);
+
+ assert !grpSet.isEmpty() : "Not applicable for sort aggregate";
+
+ RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(grpSet.asList()));
+
+ Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+ SortNode<Object[]> sort = new SortNode<>(ctx, inRowType, cmp);
+
+ sort.register(scan);
+
+ SortAggregateNode<Object[]> agg = new SortAggregateNode<>(
+ ctx,
+ aggRowType,
+ SINGLE,
+ grpSet,
+ accFactory(ctx, call, SINGLE, inRowType),
+ rowFactory,
+ cmp
+ );
+
+ agg.register(sort);
+
+ return agg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected SingleNode<Object[]> createMapReduceAggregateNodesChain(
+ ExecutionContext<Object[]> ctx,
+ ImmutableList<ImmutableBitSet> grpSets,
+ AggregateCall call,
+ RelDataType inRowType,
+ RelDataType aggRowType,
+ RowHandler.RowFactory<Object[]> rowFactory,
+ ScanNode<Object[]> scan
+ ) {
+ assert grpSets.size() == 1;
+
+ ImmutableBitSet grpSet = F.first(grpSets);
+
+ assert !grpSet.isEmpty() : "Not applicable for sort aggregate";
+
+ RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(grpSet.asList()));
+
+ Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+ SortNode<Object[]> sort = new SortNode<>(ctx, inRowType, cmp);
+
+ sort.register(scan);
+
+ SortAggregateNode<Object[]> aggMap = new SortAggregateNode<>(
+ ctx,
+ aggRowType,
+ MAP,
+ grpSet,
+ accFactory(ctx, call, MAP, inRowType),
+ rowFactory,
+ cmp
+ );
+
+ aggMap.register(sort);
+
+ // The group's fields placed on the begin of the output row (planner
+ // does this by Projection node for aggregate input).
+ // Hash aggregate doesn't use groups set on reducer because send GroupKey as object.
+ ImmutableIntList reduceGrpFields = ImmutableIntList.copyOf(
+ IntStream.range(0, grpSet.cardinality()).boxed().collect(Collectors.toList())
+ );
+
+ RelCollation rdcCollation = RelCollations.of(reduceGrpFields);
+
+ Comparator<Object[]> rdcCmp = ctx.expressionFactory().comparator(rdcCollation);
+
+ SortAggregateNode<Object[]> aggRdc = new SortAggregateNode<>(
+ ctx,
+ aggRowType,
+ REDUCE,
+ ImmutableBitSet.of(reduceGrpFields),
+ accFactory(ctx, call, REDUCE, aggRowType),
+ rowFactory,
+ rdcCmp
+ );
+
+ aggRdc.register(aggMap);
+
+ return aggRdc;
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 861970f..4ec8f16 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -176,6 +176,9 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
public static <T extends RelNode> List<T> findNodes(RelNode plan, Predicate<RelNode> pred) {
List<T> ret = new ArrayList<>();
+ if (pred.test(plan))
+ ret.add((T)plan);
+
plan.childrenAccept(
new RelVisitor() {
@Override public void visit(RelNode node, int ordinal, RelNode parent) {
@@ -325,6 +328,8 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
/** */
protected void checkSplitAndSerialization(IgniteRel rel, IgniteSchema publicSchema) {
+ assertNotNull(rel);
+
rel = Cloner.clone(rel);
SchemaPlus schema = createRootSchema(false)
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
new file mode 100644
index 0000000..f630ade
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@SuppressWarnings({"TypeMayBeWeakened"})
+@RunWith(Parameterized.class)
+public class AggregatePlannerTest extends AbstractPlannerTest {
+ /** Algorithm. */
+ @Parameterized.Parameter
+ public AggregateAlgorithm algo;
+
+ /** */
+ @Parameterized.Parameters(name = "Algorithm = {0}")
+ public static List<Object[]> parameters() {
+ return Stream.of(AggregateAlgorithm.values()).map(a -> new Object[]{a}).collect(Collectors.toList());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void singleWithoutIndex() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL0", f.createJavaType(Integer.class))
+ .add("VAL1", f.createJavaType(Integer.class))
+ .add("GRP0", f.createJavaType(Integer.class))
+ .add("GRP1", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT AVG(val0) FROM test GROUP BY grp0";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ algo.ruleToDisable
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+ Assert.assertThat(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.first(agg.getAggCallList()).getAggregation(),
+ IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+ if (algo == AggregateAlgorithm.SORT)
+ assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void singleWithIndex() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL0", f.createJavaType(Integer.class))
+ .add("VAL1", f.createJavaType(Integer.class))
+ .add("GRP0", f.createJavaType(Integer.class))
+ .add("GRP1", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_grp1");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT AVG(val0) FILTER(WHERE val1 > 10) FROM test GROUP BY grp0";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ algo.ruleToDisable
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+ Assert.assertThat(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.first(agg.getAggCallList()).getAggregation(),
+ IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+ if (algo == AggregateAlgorithm.SORT)
+ assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void mapReduceGroupBy() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL0", f.createJavaType(Integer.class))
+ .add("VAL1", f.createJavaType(Integer.class))
+ .add("GRP0", f.createJavaType(Integer.class))
+ .add("GRP1", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "test", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT AVG(val0) FILTER (WHERE val1 > 10) FROM test GROUP BY grp1, grp0";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ algo.ruleToDisable
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+ IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), rdcAgg);
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+ Assert.assertThat(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.first(rdcAgg.aggregateCalls()).getAggregation(),
+ IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+ Assert.assertThat(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.first(mapAgg.getAggCallList()).getAggregation(),
+ IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+ if (algo == AggregateAlgorithm.SORT)
+ assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @Ignore("Single aggregates must be disabled by hint: https://issues.apache.org/jira/browse/IGNITE-14274")
+ public void mapReduceDistinctWithIndex() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL0", f.createJavaType(Integer.class))
+ .add("VAL1", f.createJavaType(Integer.class))
+ .add("GRP0", f.createJavaType(Integer.class))
+ .add("GRP1", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "test", "hash");
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT DISTINCT val0, val1 FROM test";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ algo.ruleToDisable
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+ IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+ Assert.assertTrue(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.isEmpty(rdcAgg.aggregateCalls()));
+
+ Assert.assertTrue(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.isEmpty(mapAgg.getAggCallList()));
+
+ if (algo == AggregateAlgorithm.SORT)
+ assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+ }
+
+ /** */
+ @Test
+ public void notApplicableForSortAggregate() {
+ if (algo == AggregateAlgorithm.HASH)
+ return;
+
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL0", f.createJavaType(Integer.class))
+ .add("VAL1", f.createJavaType(Integer.class))
+ .add("GRP0", f.createJavaType(Integer.class))
+ .add("GRP1", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "test", "hash");
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT MIN(val0) FROM test";
+
+ GridTestUtils.assertThrows(log,
+ () -> physicalPlan(
+ sql,
+ publicSchema,
+ "HashAggregateConverterRule"
+ ),
+ RelOptPlanner.CannotPlanException.class,
+ "There are not enough rules to produce a node with desired properties"
+ );
+ }
+
+ /** */
+ enum AggregateAlgorithm {
+ /** */
+ SORT(
+ IgniteSortAggregate.class,
+ IgniteMapSortAggregate.class,
+ IgniteReduceSortAggregate.class,
+ "HashAggregateConverterRule"
+ ),
+
+ /** */
+ HASH(
+ IgniteHashAggregate.class,
+ IgniteMapHashAggregate.class,
+ IgniteReduceHashAggregate.class,
+ "SortAggregateConverterRule"
+ );
+
+ /** */
+ public final Class<? extends IgniteAggregateBase> single;
+
+ /** */
+ public final Class<? extends IgniteAggregate> map;
+
+ /** */
+ public final Class<? extends IgniteReduceAggregateBase> reduce;
+
+ /** */
+ public final String ruleToDisable;
+
+ /** */
+ AggregateAlgorithm(
+ Class<? extends IgniteAggregateBase> single,
+ Class<? extends IgniteAggregate> map,
+ Class<? extends IgniteReduceAggregateBase> reduce,
+ String ruleToDisable) {
+ this.single = single;
+ this.map = map;
+ this.reduce = reduce;
+ this.ruleToDisable = ruleToDisable;
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregateTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregateTest.java
new file mode 100644
index 0000000..ff6f73e
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregateTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class HashAggregateTest extends AbstractPlannerTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void test() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable employer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Employers", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("EMPS", employer);
+
+ String sql = "SELECT * FROM emps WHERE emps.salary = (SELECT AVG(emps.salary) FROM emps)";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema
+ );
+
+ assertNotNull(phys);
+
+ IgniteReduceHashAggregate rdcAgg = findFirstNode(phys, byClass(IgniteReduceHashAggregate.class));
+ IgniteMapHashAggregate mapAgg = findFirstNode(phys, byClass(IgniteMapHashAggregate.class));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), rdcAgg);
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+ Assert.assertThat(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.first(rdcAgg.aggregateCalls()).getAggregation(),
+ IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+ Assert.assertThat(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.first(mapAgg.getAggCallList()).getAggregation(),
+ IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+ System.out.println("+++\n" + RelOptUtil.toString(phys));
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index a14060a..a15470f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -631,99 +631,6 @@ public class PlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testAggregate() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- TestTable employer = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("NAME", f.createJavaType(String.class))
- .add("SALARY", f.createJavaType(Double.class))
- .build()) {
-
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
- return ColocationGroup.forAssignments(Arrays.asList(
- select(nodes, 0, 1),
- select(nodes, 1, 2),
- select(nodes, 2, 0),
- select(nodes, 0, 1),
- select(nodes, 1, 2)
- ));
- }
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "Employers", "hash");
- }
- };
-
- IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
- publicSchema.addTable("EMPS", employer);
-
- SchemaPlus schema = createRootSchema(false)
- .add("PUBLIC", publicSchema);
-
- String sql = "SELECT * FROM emps WHERE emps.salary = (SELECT AVG(emps.salary) FROM emps)";
-
- RelTraitDef<?>[] traitDefs = {
- DistributionTraitDef.INSTANCE,
- ConventionTraitDef.INSTANCE,
- RelCollationTraitDef.INSTANCE,
- RewindabilityTraitDef.INSTANCE,
- CorrelationTraitDef.INSTANCE
- };
-
- PlanningContext ctx = PlanningContext.builder()
- .localNodeId(F.first(nodes))
- .originatingNodeId(F.first(nodes))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .traitDefs(traitDefs)
- .build())
- .logger(log)
- .query(sql)
- .parameters(2)
- .topologyVersion(AffinityTopologyVersion.NONE)
- .build();
-
- RelNode root;
-
- try (IgnitePlanner planner = ctx.planner()) {
- assertNotNull(planner);
-
- String qry = ctx.query();
-
- assertNotNull(qry);
-
- // Parse
- SqlNode sqlNode = planner.parse(qry);
-
- // Validate
- sqlNode = planner.validate(sqlNode);
-
- // Convert to Relational operators graph
- root = planner.convert(sqlNode);
-
- // Transformation chain
- root = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, root.getTraitSet(), root);
-
- // Transformation chain
- RelTraitSet desired = root.getCluster().traitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
- .simplify();
-
- root = planner.transform(PlannerPhase.OPTIMIZATION, desired, root);
- }
-
- assertNotNull(root);
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
public void testHepPlaner() throws Exception {
IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index 4de7817..2925782 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -19,9 +19,12 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -37,6 +40,9 @@ import org.junit.runners.Suite;
NestedLoopJoinExecutionTest.class,
TableSpoolExecutionTest.class,
IndexSpoolExecutionTest.class,
+ HashAggregateExecutionTest.class,
+ HashAggregateSingleGroupExecutionTest.class,
+ SortAggregateExecutionTest.class,
})
public class ExecutionTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 088e15e..9c656ca 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -22,10 +22,11 @@ import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondary
import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
import org.apache.ignite.internal.processors.query.calcite.CancelTest;
-import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
+import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
+import org.apache.ignite.internal.processors.query.calcite.SortAggregateIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
@@ -56,7 +57,8 @@ import org.junit.runners.Suite;
LimitOffsetTest.class,
SqlFieldsQueryUsageTest.class,
AggregatesIntegrationTest.class,
- MetadataIntegrationTest.class
+ MetadataIntegrationTest.class,
+ SortAggregateIntegrationTest.class,
})
public class IgniteCalciteTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index ad329f1..959021f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -32,7 +33,8 @@ import org.junit.runners.Suite;
PlannerTest.class,
CorrelatedNestedLoopJoinPlannerTest.class,
TableSpoolPlannerTest.class,
- IndexSpoolPlannerTest.class
+ IndexSpoolPlannerTest.class,
+ AggregatePlannerTest.class,
})
public class PlannerTestSuite {
}