You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/19 09:52:39 UTC
[ignite] branch sql-calcite updated: IGNITE-14274 Calcite:
refactoring aggregates converter rules: single and map/reduce
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 32b9fac IGNITE-14274 Calcite: refactoring aggregates converter rules: single and map/reduce
32b9fac is described below
commit 32b9facdd27960d53b7bcedec04415566eabb875
Author: tledkov <tl...@gridgain.com>
AuthorDate: Fri Mar 19 12:52:22 2021 +0300
IGNITE-14274 Calcite: refactoring aggregates converter rules: single and map/reduce
---
.../query/calcite/exec/LogicalRelImplementor.java | 24 +-
.../query/calcite/exec/rel/SortAggregateNode.java | 2 +
.../query/calcite/externalize/RelJson.java | 1 +
.../processors/query/calcite/prepare/Cloner.java | 16 +-
.../query/calcite/prepare/IgniteRelShuttle.java | 16 +-
.../query/calcite/prepare/PlannerPhase.java | 8 +-
.../query/calcite/rel/IgniteAggregateBase.java | 287 --------------------
.../query/calcite/rel/IgniteHashAggregate.java | 98 -------
.../query/calcite/rel/IgniteRelVisitor.java | 11 +-
.../calcite/rel/agg/IgniteHashAggregateBase.java | 46 ++++
.../calcite/rel/agg/IgniteMapAggregateBase.java | 86 ++++++
.../rel/{ => agg}/IgniteMapHashAggregate.java | 6 +-
.../rel/{ => agg}/IgniteMapSortAggregate.java | 17 +-
.../rel/{ => agg}/IgniteReduceAggregateBase.java | 49 +++-
.../rel/{ => agg}/IgniteReduceHashAggregate.java | 16 +-
.../rel/{ => agg}/IgniteReduceSortAggregate.java | 11 +-
.../calcite/rel/agg/IgniteSingleAggregateBase.java | 86 ++++++
.../IgniteSingleHashAggregate.java} | 49 +---
.../IgniteSingleSortAggregate.java} | 62 +----
.../calcite/rel/agg/IgniteSortAggregateBase.java | 71 +++++
.../calcite/rule/HashAggregateConverterRule.java | 84 +++++-
.../calcite/rule/SortAggregateConverterRule.java | 124 +++++++--
.../query/calcite/rule/UnionConverterRule.java | 68 +++--
.../processors/query/calcite/trait/TraitUtils.java | 39 ---
.../rel/HashAggregateSingleGroupExecutionTest.java | 2 +-
.../planner/AggregateDistinctPlannerTest.java | 150 +++++++++++
.../calcite/planner/AggregatePlannerTest.java | 105 ++------
.../calcite/planner/HashAggregatePlannerTest.java | 8 +-
.../query/calcite/planner/PlannerTest.java | 288 ---------------------
.../calcite/planner/SortAggregatePlannerTest.java | 114 +++++++-
.../query/calcite/planner/UnionPlannerTest.java | 240 +++++++++++++++++
.../apache/ignite/testsuites/PlannerTestSuite.java | 2 +
32 files changed, 1186 insertions(+), 1000 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 0d21530..0f891bb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -59,29 +59,29 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
@@ -438,7 +438,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteHashAggregate rel) {
+ @Override public Node<Row> visit(IgniteSingleHashAggregate rel) {
AggregateType type = AggregateType.SINGLE;
RelDataType rowType = rel.getRowType();
@@ -484,10 +484,10 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
RelDataType rowType = rel.getRowType();
Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
- type, rel.aggregateCalls(), null);
+ type, rel.getAggregateCalls(), null);
RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
- HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.groupSets(), accFactory, rowFactory);
+ HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
Node<Row> input = visit(rel.getInput());
@@ -497,7 +497,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteSortAggregate rel) {
+ @Override public Node<Row> visit(IgniteSingleSortAggregate rel) {
AggregateType type = AggregateType.SINGLE;
RelDataType rowType = rel.getRowType();
@@ -568,7 +568,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
type,
- rel.aggregateCalls(),
+ rel.getAggregateCalls(),
null
);
@@ -578,7 +578,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
ctx,
rowType,
type,
- rel.groupSet(),
+ rel.getGroupSet(),
accFactory,
rowFactory,
expressionFactory.comparator(rel.collation())
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
index af63e34..eceb620 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
@@ -21,6 +21,7 @@ import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
+import java.util.Objects;
import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
@@ -84,6 +85,7 @@ public class SortAggregateNode<Row> extends AbstractNode<Row> implements SingleN
Comparator<Row> comp
) {
super(ctx, rowType);
+ assert Objects.nonNull(comp);
this.type = type;
this.accFactory = accFactory;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index af0d2cb..d454b9a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -217,6 +217,7 @@ class RelJson {
private static final List<String> PACKAGES =
ImmutableList.of(
"org.apache.ignite.internal.processors.query.calcite.rel.",
+ "org.apache.ignite.internal.processors.query.calcite.rel.agg.",
"org.apache.calcite.rel.",
"org.apache.calcite.rel.core.",
"org.apache.calcite.rel.logical.",
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 2069945..1b2d530 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -22,29 +22,29 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
@@ -190,7 +190,7 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteHashAggregate rel) {
+ @Override public IgniteRel visit(IgniteSingleHashAggregate rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
@@ -205,7 +205,7 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSortAggregate rel) {
+ @Override public IgniteRel visit(IgniteSingleSortAggregate rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 7e232f29..5f05cc6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -22,29 +22,29 @@ import java.util.List;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/** */
@@ -90,7 +90,7 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteHashAggregate rel) {
+ @Override public IgniteRel visit(IgniteSingleHashAggregate rel) {
return processNode(rel);
}
@@ -105,7 +105,7 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSortAggregate rel) {
+ @Override public IgniteRel visit(IgniteSingleSortAggregate rel) {
return processNode(rel);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 6f21db7..e108efa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -163,8 +163,12 @@ public enum PlannerPhase {
ValuesConverterRule.INSTANCE,
LogicalScanConverterRule.INDEX_SCAN,
LogicalScanConverterRule.TABLE_SCAN,
- HashAggregateConverterRule.INSTANCE,
- SortAggregateConverterRule.INSTANCE,
+ HashAggregateConverterRule.SINGLE,
+ HashAggregateConverterRule.MAP_REDUCE,
+ SortAggregateConverterRule.SINGLE,
+ SortAggregateConverterRule.MAP_REDUCE,
+ MergeJoinConverterRule.INSTANCE,
+ NestedLoopJoinConverterRule.INSTANCE,
ProjectConverterRule.INSTANCE,
FilterConverterRule.INSTANCE,
TableModifyConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
deleted file mode 100644
index 32ef760..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
-
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
-/**
- *
- */
-public abstract class IgniteAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
- /** {@inheritDoc} */
- protected IgniteAggregateBase(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- RelNode input,
- ImmutableBitSet groupSet,
- List<ImmutableBitSet> groupSets,
- List<AggregateCall> aggCalls
- ) {
- super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
- }
-
- /** {@inheritDoc} */
- protected IgniteAggregateBase(RelInput input) {
- super(changeTraits(input, IgniteConvention.INSTANCE));
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- // Distribution propagation is based on next rules:
- // 1) Any aggregation is possible on single or broadcast distribution.
- // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
- // and all of input distribution keys are parts of aggregation group and vice versa.
- // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
- RelTraitSet in = inputTraits.get(0);
-
- IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
- RelDistribution.Type distrType = distribution.getType();
-
- switch (distrType) {
- case SINGLETON:
- case BROADCAST_DISTRIBUTED:
- return Pair.of(nodeTraits, ImmutableList.of(in.replace(distribution)));
-
- case RANDOM_DISTRIBUTED:
- if (!groupSet.isEmpty() && isSimple(this)) {
- IgniteDistribution outDistr = hash(range(0, groupSet.cardinality()));
- IgniteDistribution inDistr = hash(groupSet.asList());
-
- return Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in.replace(inDistr)));
- }
-
- break;
-
- case HASH_DISTRIBUTED:
- ImmutableIntList keys = distribution.getKeys();
-
- if (isSimple(this) && groupSet.cardinality() == keys.size()) {
- Mappings.TargetMapping mapping = Commons.inverseMapping(
- groupSet, getInput().getRowType().getFieldCount());
-
- List<Integer> srcKeys = new ArrayList<>(keys.size());
-
- for (int key : keys) {
- int src = mapping.getSourceOpt(key);
-
- if (src == -1)
- break;
-
- srcKeys.add(src);
- }
-
- if (srcKeys.size() == keys.size())
- return Pair.of(nodeTraits, ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
- }
-
- break;
-
- default:
- break;
- }
-
- return Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single())));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- // Aggregate is rewindable if its input is rewindable.
-
- RelTraitSet in = inputTraits.get(0);
-
- RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
- ? RewindabilityTrait.ONE_WAY
- : TraitUtils.rewindability(in);
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), ImmutableList.of(in.replace(rewindability))));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- // Distribution propagation is based on next rules:
- // 1) Any aggregation is possible on single or broadcast distribution.
- // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
- // and all of input distribution keys are parts aggregation group.
- // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
- RelTraitSet in = inputTraits.get(0);
-
- List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
- IgniteDistribution distribution = TraitUtils.distribution(in);
-
- RelDistribution.Type distrType = distribution.getType();
-
- switch (distrType) {
- case SINGLETON:
- case BROADCAST_DISTRIBUTED:
- res.add(Pair.of(nodeTraits.replace(distribution), ImmutableList.of(in)));
-
- break;
-
- case HASH_DISTRIBUTED:
- if (isSimple(this)) {
- ImmutableIntList keys = distribution.getKeys();
-
- //Check that group by contains all key columns
- if (groupSet.cardinality() == keys.size()) {
- Mappings.TargetMapping mapping = Commons.mapping(
- groupSet, getInput().getRowType().getFieldCount());
-
- IgniteDistribution outDistr = distribution.apply(mapping);
-
- if (outDistr.getType() == HASH_DISTRIBUTED)
- res.add(Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in)));
- }
-
- //Map-reduce aggregates especial for non 'group by' query, like a select count(*) from table
- if (groupSet.isEmpty())
- res.add(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(hash(keys, distribution.function())))));
- }
-
- break;
-
- case RANDOM_DISTRIBUTED:
- // Map-reduce aggregates
- if (isSimple(this)) {
- res.add(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(random()))));
- res.add(Pair.of(nodeTraits.replace(broadcast()), ImmutableList.of(in.replace(random()))));
- }
-
- break;
-
- default:
- break;
- }
-
- if (!res.isEmpty())
- return res;
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single()))));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits
- ) {
- return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
- inTraits));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull RelNode createNode(RelTraitSet outTraits, List<RelTraitSet> inTraits) {
- RelTraitSet in = inTraits.get(0);
-
- if (!isMapReduce(outTraits, in))
- return copy(outTraits, ImmutableList.of(convert(getInput(), in)));
-
- if (U.assertionsEnabled()) {
- ImmutableList<RelTrait> diff = in.difference(outTraits);
-
- assert diff.size() == 1 && F.first(diff) == TraitUtils.distribution(outTraits);
- }
-
- RelNode map = createMapAggregate(
- getCluster(),
- in,
- convert(getInput(), in),
- groupSet,
- groupSets,
- aggCalls);
-
- return createReduceAggregate(
- getCluster(),
- outTraits,
- convert(map, outTraits),
- groupSet,
- groupSets,
- aggCalls,
- getRowType());
- }
-
- /** */
- protected abstract RelNode createReduceAggregate(
- RelOptCluster cluster,
- RelTraitSet traits,
- RelNode input,
- ImmutableBitSet groupSet,
- ImmutableList<ImmutableBitSet> groupSets,
- List<AggregateCall> aggCalls,
- RelDataType rowType
- );
-
- /** */
- protected abstract RelNode createMapAggregate(
- RelOptCluster cluster,
- RelTraitSet traits,
- RelNode input,
- ImmutableBitSet groupSet,
- ImmutableList<ImmutableBitSet> groupSets,
- List<AggregateCall> aggCalls
- );
-
- /** */
- private boolean isMapReduce(RelTraitSet out, RelTraitSet in) {
- return TraitUtils.distribution(out).satisfies(single())
- && TraitUtils.distribution(in).satisfies(random());
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
deleted file mode 100644
index a0f4613..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
-
-/**
- *
- */
-public class IgniteHashAggregate extends IgniteAggregateBase {
- /** {@inheritDoc} */
- public IgniteHashAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
- }
-
- /** {@inheritDoc} */
- public IgniteHashAggregate(RelInput input) {
- super(input);
- }
-
- /** {@inheritDoc} */
- @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteHashAggregate(cluster, getTraitSet(), sole(inputs),
- getGroupSet(), getGroupSets(), getAggCallList());
- }
-
- /** {@inheritDoc} */
- @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
- return visitor.visit(this);
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Since it's a hash aggregate it erases collation.
- return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Since it's a hash aggregate it erases collation.
- return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
- }
-
- /** {@inheritDoc} */
- @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteMapHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls);
- }
-
- /** {@inheritDoc} */
- @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
- RelDataType rowType) {
- return new IgniteReduceHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, getRowType());
- }
-
- /** {@inheritDoc} */
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- return computeSelfCostHash(planner, mq);
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 8c620a6..e47dbf6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -17,6 +17,13 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+
/**
* A visitor to traverse an Ignite relational nodes tree.
*/
@@ -79,7 +86,7 @@ public interface IgniteRelVisitor<T> {
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
- T visit(IgniteHashAggregate rel);
+ T visit(IgniteSingleHashAggregate rel);
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
@@ -94,7 +101,7 @@ public interface IgniteRelVisitor<T> {
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
- T visit(IgniteSortAggregate rel);
+ T visit(IgniteSingleSortAggregate rel);
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteHashAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteHashAggregateBase.java
new file mode 100644
index 0000000..d4b1f44
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteHashAggregateBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+/**
+ *
+ */
+interface IgniteHashAggregateBase extends TraitsAwareIgniteRel {
+ /** {@inheritDoc} */
+ @Override default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ // Since it's a hash aggregate it erases collation.
+ return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+ ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
+ }
+
+ /** {@inheritDoc} */
+ @Override default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ // Since it's a hash aggregate it erases collation.
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+ ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
+ }
+
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapAggregateBase.java
new file mode 100644
index 0000000..7d852532
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapAggregateBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+/**
+ *
+ */
+public abstract class IgniteMapAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
+ /** {@inheritDoc} */
+ protected IgniteMapAggregateBase(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls
+ ) {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+ }
+
+ /** {@inheritDoc} */
+ protected IgniteMapAggregateBase(RelInput input) {
+ super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits, ImmutableList.of(inputTraits.get(0))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ RelTraitSet in = inputTraits.get(0);
+
+ if (TraitUtils.distribution(in).satisfies(IgniteDistributions.single()))
+ return ImmutableList.of();
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.distribution(in)), ImmutableList.of(in)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+ inTraits));
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapHashAggregate.java
similarity index 91%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapHashAggregate.java
index 49bdf9e..c6febb8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapHashAggregate.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
@@ -33,13 +33,15 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
*/
-public class IgniteMapHashAggregate extends IgniteAggregate {
+public class IgniteMapHashAggregate extends IgniteMapAggregateBase implements IgniteHashAggregateBase {
/** */
public IgniteMapHashAggregate(
RelOptCluster cluster,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapSortAggregate.java
similarity index 84%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapSortAggregate.java
index 171b747..53a78d8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapSortAggregate.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
import java.util.Objects;
@@ -36,14 +36,16 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
/**
*
*/
-public class IgniteMapSortAggregate extends IgniteAggregate {
+public class IgniteMapSortAggregate extends IgniteMapAggregateBase implements IgniteSortAggregateBase {
/** Collation. */
private final RelCollation collation;
@@ -67,7 +69,7 @@ public class IgniteMapSortAggregate extends IgniteAggregate {
/** */
public IgniteMapSortAggregate(RelInput input) {
- super(changeTraits(input, IgniteConvention.INSTANCE));
+ super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
collation = input.getCollation();
@@ -82,14 +84,15 @@ public class IgniteMapSortAggregate extends IgniteAggregate {
ImmutableBitSet groupSet,
List<ImmutableBitSet> groupSets,
List<AggregateCall> aggCalls) {
- return new IgniteMapSortAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
+ return new IgniteMapSortAggregate(
+ getCluster(), traitSet, input, groupSet, groupSets, aggCalls, TraitUtils.collation(traitSet));
}
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteMapSortAggregate(
cluster,
- getTraitSet(),
+ getTraitSet().replace(collation),
sole(inputs),
getGroupSet(),
getGroupSets(),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
similarity index 66%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
index cfab999..0c3f171 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
@@ -32,12 +32,18 @@ import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
/**
*
*/
-public abstract class IgniteReduceAggregateBase extends SingleRel implements IgniteRel {
+public abstract class IgniteReduceAggregateBase extends SingleRel implements TraitsAwareIgniteRel {
/** */
protected final ImmutableBitSet groupSet;
@@ -102,17 +108,50 @@ public abstract class IgniteReduceAggregateBase extends SingleRel implements Ign
}
/** */
- public ImmutableBitSet groupSet() {
+ public ImmutableBitSet getGroupSet() {
return groupSet;
}
/** */
- public List<ImmutableBitSet> groupSets() {
+ public List<ImmutableBitSet> getGroupSets() {
return groupSets;
}
/** */
- public List<AggregateCall> aggregateCalls() {
+ public List<AggregateCall> getAggregateCalls() {
return aggCalls;
}
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ return ImmutableList.of(
+ Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ RelTraitSet in = inputTraits.get(0);
+
+ return ImmutableList.of(
+ Pair.of(
+ nodeTraits.replace(IgniteDistributions.single()),
+ ImmutableList.of(in.replace(IgniteDistributions.single()))
+ )
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+ inTraits));
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceHashAggregate.java
similarity index 83%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceHashAggregate.java
index dcda3ac..cc3741f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceHashAggregate.java
@@ -15,29 +15,34 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
+import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
*/
-public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase {
+public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase implements IgniteHashAggregateBase {
/** */
public IgniteReduceHashAggregate(
RelOptCluster cluster,
@@ -101,4 +106,11 @@ public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase {
0
);
}
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+ ImmutableList.of(inTraits.get(0).replace(RelCollations.EMPTY))));
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceSortAggregate.java
similarity index 89%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceSortAggregate.java
index bb71bf1..83dd626 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceSortAggregate.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
import java.util.Objects;
@@ -34,11 +34,14 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
/**
*
*/
-public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
+public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase implements IgniteSortAggregateBase {
/** Collation. */
private final RelCollation collation;
@@ -80,7 +83,7 @@ public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
groupSets,
aggCalls,
rowType,
- collation
+ TraitUtils.collation(traitSet)
);
}
@@ -88,7 +91,7 @@ public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteReduceSortAggregate(
cluster,
- getTraitSet(),
+ getTraitSet().replace(collation),
sole(inputs),
groupSet,
groupSets,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
new file mode 100644
index 0000000..348f4b6
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+/**
+ *
+ */
+public abstract class IgniteSingleAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
+ /** {@inheritDoc} */
+ protected IgniteSingleAggregateBase(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls
+ ) {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+ }
+
+ /** {@inheritDoc} */
+ protected IgniteSingleAggregateBase(RelInput input) {
+ super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits, ImmutableList.of(inputTraits.get(0))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ RelTraitSet in = inputTraits.get(0);
+
+ if (!TraitUtils.distribution(in).satisfies(IgniteDistributions.single()))
+ return ImmutableList.of();
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), ImmutableList.of(in)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+ inTraits));
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleHashAggregate.java
similarity index 55%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleHashAggregate.java
index 49bdf9e..a965595 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleHashAggregate.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
@@ -28,43 +28,32 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
/**
*
*/
-public class IgniteMapHashAggregate extends IgniteAggregate {
- /** */
- public IgniteMapHashAggregate(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- RelNode input,
- ImmutableBitSet groupSet,
- List<ImmutableBitSet> groupSets,
- List<AggregateCall> aggCalls
- ) {
+public class IgniteSingleHashAggregate extends IgniteSingleAggregateBase implements IgniteHashAggregateBase {
+ /** {@inheritDoc} */
+ public IgniteSingleHashAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
}
- /** */
- public IgniteMapHashAggregate(RelInput input) {
+ /** {@inheritDoc} */
+ public IgniteSingleHashAggregate(RelInput input) {
super(input);
}
/** {@inheritDoc} */
@Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteMapHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+ return new IgniteSingleHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
}
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteMapHashAggregate(cluster, getTraitSet(), sole(inputs),
+ return new IgniteSingleHashAggregate(cluster, getTraitSet(), sole(inputs),
getGroupSet(), getGroupSets(), getAggCallList());
}
@@ -74,24 +63,6 @@ public class IgniteMapHashAggregate extends IgniteAggregate {
}
/** {@inheritDoc} */
- @Override protected RelDataType deriveRowType() {
- return rowType(Commons.typeFactory(getCluster()));
- }
-
- /** */
- public static RelDataType rowType(RelDataTypeFactory typeFactory) {
- assert typeFactory instanceof IgniteTypeFactory;
-
- RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
-
- builder.add("GROUP_ID", typeFactory.createJavaType(byte.class));
- builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
- builder.add("AGG_DATA", typeFactory.createArrayType(typeFactory.createJavaType(Accumulator.class), -1));
-
- return builder.build();
- }
-
- /** {@inheritDoc} */
@Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
return computeSelfCostHash(planner, mq);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleSortAggregate.java
similarity index 55%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleSortAggregate.java
index 0a2e3e5..1171506 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleSortAggregate.java
@@ -15,40 +15,36 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
/**
*
*/
-public class IgniteSortAggregate extends IgniteAggregateBase {
+public class IgniteSingleSortAggregate extends IgniteSingleAggregateBase implements IgniteSortAggregateBase {
/** Collation. */
private final RelCollation collation;
/** {@inheritDoc} */
- public IgniteSortAggregate(
+ public IgniteSingleSortAggregate(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode input,
@@ -65,7 +61,7 @@ public class IgniteSortAggregate extends IgniteAggregateBase {
}
/** {@inheritDoc} */
- public IgniteSortAggregate(RelInput input) {
+ public IgniteSingleSortAggregate(RelInput input) {
super(input);
collation = input.getCollation();
@@ -76,12 +72,12 @@ public class IgniteSortAggregate extends IgniteAggregateBase {
/** {@inheritDoc} */
@Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteSortAggregate(getCluster(), traitSet.replace(collation), input, groupSet, groupSets, aggCalls);
+ return new IgniteSingleSortAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
}
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteSortAggregate(cluster, getTraitSet(), sole(inputs),
+ return new IgniteSingleSortAggregate(cluster, getTraitSet().replace(collation), sole(inputs),
getGroupSet(), getGroupSets(), getAggCallList());
}
@@ -96,48 +92,6 @@ public class IgniteSortAggregate extends IgniteAggregateBase {
}
/** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
- RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
- ) {
- RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(groupSet.asList()));
-
- return Pair.of(nodeTraits.replace(collation),
- ImmutableList.of(inputTraits.get(0).replace(collation)));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
- RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
- ) {
- RelCollation inputCollation = TraitUtils.collation(inputTraits.get(0));
-
- List<RelCollation> satisfiedCollations = TraitUtils.permute(groupSet.asList()).stream()
- .filter(col -> inputCollation.satisfies(col))
- .collect(Collectors.toList());
-
- if (satisfiedCollations.isEmpty())
- return ImmutableList.of();
-
- return satisfiedCollations.stream()
- .map(col -> Pair.of(nodeTraits.replace(col), inputTraits))
- .collect(Collectors.toList());
- }
-
- /** {@inheritDoc} */
- @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new IgniteMapSortAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, collation);
- }
-
- /** {@inheritDoc} */
- @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
- RelDataType rowType) {
- return new IgniteReduceSortAggregate(getCluster(), traits, input, groupSet, groupSets,
- aggCalls, getRowType(), collation);
- }
-
- /** {@inheritDoc} */
@Override public RelCollation collation() {
assert collation.equals(super.collation());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSortAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSortAggregateBase.java
new file mode 100644
index 0000000..8f50c4d
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSortAggregateBase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.maxPrefix;
+
+/**
+ *
+ */
+interface IgniteSortAggregateBase extends TraitsAwareIgniteRel {
+ /**
+ * Returns a bit set of the grouping fields.
+ *
+ * @return bit set of ordinals of grouping fields
+ */
+ ImmutableBitSet getGroupSet();
+
+ /** {@inheritDoc} */
+ @Override default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
+ RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+ ) {
+ RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(getGroupSet().asList()));
+
+ return Pair.of(nodeTraits.replace(collation),
+ ImmutableList.of(inputTraits.get(0).replace(collation)));
+ }
+
+ /** {@inheritDoc} */
+ @Override default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
+ RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+ ) {
+ RelCollation inputCollation = TraitUtils.collation(inputTraits.get(0));
+
+ List<Integer> newCollation = maxPrefix(inputCollation.getKeys(), getGroupSet().asSet());
+
+ if (newCollation.size() < getGroupSet().cardinality())
+ return ImmutableList.of();
+
+ return ImmutableList.of(Pair.of(
+ nodeTraits.replace(RelCollations.of(ImmutableIntList.copyOf(newCollation))),
+ inputTraits
+ ));
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
index b1cf2e2a..4e86eb8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
@@ -26,29 +26,85 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
/**
*
*/
-public class HashAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+public class HashAggregateConverterRule {
/** */
- public static final RelOptRule INSTANCE = new HashAggregateConverterRule();
+ public static final RelOptRule SINGLE = new HashSingleAggregateConverterRule();
/** */
- public HashAggregateConverterRule() {
- super(LogicalAggregate.class, "HashAggregateConverterRule");
+ public static final RelOptRule MAP_REDUCE = new HashMapReduceAggregateConverterRule();
+
+ /** */
+ private HashAggregateConverterRule() {
+ // No-op.
+ }
+
+ /** */
+ private static class HashSingleAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+ /** */
+ HashSingleAggregateConverterRule() {
+ super(LogicalAggregate.class, "HashSingleAggregateConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+ LogicalAggregate agg) {
+ RelOptCluster cluster = agg.getCluster();
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+ RelNode input = convert(agg.getInput(), inTrait);
+
+ return new IgniteSingleHashAggregate(
+ cluster,
+ outTrait,
+ input,
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList()
+ );
+ }
}
- /** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
- LogicalAggregate rel) {
- RelOptCluster cluster = rel.getCluster();
- RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
- RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
- RelNode input = convert(rel.getInput(), inTrait);
+ /** */
+ private static class HashMapReduceAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+ /** */
+ HashMapReduceAggregateConverterRule() {
+ super(LogicalAggregate.class, "HashMapReduceAggregateConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+ LogicalAggregate agg) {
+ RelOptCluster cluster = agg.getCluster();
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelNode input = convert(agg.getInput(), inTrait);
+
+ RelNode map = new IgniteMapHashAggregate(
+ cluster,
+ outTrait,
+ input,
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList()
+ );
- return new IgniteHashAggregate(cluster, outTrait, input,
- rel.getGroupSet(), rel.getGroupSets(), rel.getAggCallList());
+ return new IgniteReduceHashAggregate(
+ cluster,
+ outTrait.replace(IgniteDistributions.single()),
+ convert(map, inTrait.replace(IgniteDistributions.single())),
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList(),
+ agg.getRowType()
+ );
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
index a4dc10a..a8e304b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
@@ -29,40 +29,108 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.util.typedef.F;
-/** */
-public class SortAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+/**
+ *
+ */
+public class SortAggregateConverterRule {
+ /** */
+ public static final RelOptRule SINGLE = new SortSingleAggregateConverterRule();
+
+ /** */
+ public static final RelOptRule MAP_REDUCE = new SortMapReduceAggregateConverterRule();
+
/** */
- public static final RelOptRule INSTANCE = new SortAggregateConverterRule();
+ private SortAggregateConverterRule() {
+ // No-op.
+ }
/** */
- public SortAggregateConverterRule() {
- super(LogicalAggregate.class, "SortAggregateConverterRule");
+ private static class SortSingleAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+ /** */
+ SortSingleAggregateConverterRule() {
+ super(LogicalAggregate.class, "SortSingleAggregateConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+ LogicalAggregate agg) {
+ // Applicable only for GROUP BY
+ if (F.isEmpty(agg.getGroupSet()) || agg.getGroupSets().size() > 1)
+ return null;
+
+ RelOptCluster cluster = agg.getCluster();
+ RelNode input = agg.getInput();
+
+ RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(agg.getGroupSet().asList()));
+
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE)
+ .replace(collation)
+ .replace(IgniteDistributions.single());
+
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE)
+ .replace(collation)
+ .replace(IgniteDistributions.single());
+
+ return new IgniteSingleSortAggregate(
+ cluster,
+ outTrait,
+ convert(input, inTrait),
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList()
+ );
+ }
}
- /** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
- LogicalAggregate rel) {
- // Applicable only for GROUP BY
- if (F.isEmpty(rel.getGroupSet()) || rel.getGroupSets().size() > 1)
- return null;
-
- RelOptCluster cluster = rel.getCluster();
- RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
- RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
- RelNode input = rel.getInput();
-
- RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(rel.getGroupSet().asList()));
-
- return new IgniteSortAggregate(
- cluster,
- outTrait.replace(collation),
- convert(input, inTrait.replace(collation)),
- rel.getGroupSet(),
- rel.getGroupSets(),
- rel.getAggCallList()
- );
+ /** */
+ private static class SortMapReduceAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+ /** */
+ SortMapReduceAggregateConverterRule() {
+ super(LogicalAggregate.class, "SortMapReduceAggregateConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+ LogicalAggregate agg) {
+ // Applicable only for GROUP BY
+ if (F.isEmpty(agg.getGroupSet()) || agg.getGroupSets().size() > 1)
+ return null;
+
+ RelOptCluster cluster = agg.getCluster();
+ RelNode input = agg.getInput();
+
+ RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(agg.getGroupSet().asList()));
+
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE)
+ .replace(collation);
+
+ RelNode map = new IgniteMapSortAggregate(
+ cluster,
+ outTrait,
+ convert(input, inTrait),
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList(),
+ collation
+ );
+
+ return new IgniteReduceSortAggregate(
+ cluster,
+ outTrait.replace(IgniteDistributions.single()),
+ convert(map, inTrait.replace(IgniteDistributions.single())),
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList(),
+ agg.getRowType(),
+ collation
+ );
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
index cd85348..46a438c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
@@ -19,45 +19,77 @@ package org.apache.ignite.internal.processors.query.calcite.rule;
import java.util.List;
-import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.logical.LogicalUnion;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
*/
-public class UnionConverterRule extends AbstractIgniteConverterRule<LogicalUnion> {
- /** */
- public static final RelOptRule INSTANCE = new UnionConverterRule();
+public class UnionConverterRule extends RelRule<UnionConverterRule.Config> {
+ /** Instance. */
+ public static final RelOptRule INSTANCE = Config.DEFAULT.toRule();
/** */
- public UnionConverterRule() {
- super(LogicalUnion.class, "UnionConverterRule");
+ public UnionConverterRule(Config cfg) {
+ super(cfg);
}
/** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalUnion rel) {
- RelOptCluster cluster = rel.getCluster();
+ @Override public void onMatch(RelOptRuleCall call) {
+ final LogicalUnion union = call.rel(0);
+
+ RelOptCluster cluster = union.getCluster();
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE);
- List<RelNode> inputs = Commons.transform(rel.getInputs(), input -> convert(input, traits));
+ List<RelNode> inputs = Commons.transform(union.getInputs(), input -> convert(input, traits));
+
+ RelNode res = new IgniteUnionAll(cluster, traits, inputs);
+
+ if (!union.all) {
+ final RelBuilder relBuilder = relBuilderFactory.create(union.getCluster(), null);
+
+ relBuilder
+ .push(res)
+ .aggregate(relBuilder.groupKey(ImmutableBitSet.range(union.getRowType().getFieldCount())));
+
+ res = convert(relBuilder.build(), union.getTraitSet());
+ }
+
+ call.transformTo(res);
+ }
- PhysicalNode res = new IgniteUnionAll(cluster, traits, inputs);
+ /**
+ *
+ */
+ public interface Config extends RelRule.Config {
+ /** */
+ UnionConverterRule.Config DEFAULT = RelRule.Config.EMPTY
+ .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+ .withDescription("UnionConverterRule")
+ .as(UnionConverterRule.Config.class)
+ .withOperandFor(LogicalUnion.class);
- if (!rel.all)
- res = new IgniteHashAggregate(cluster, traits, res,
- ImmutableBitSet.range(rel.getRowType().getFieldCount()), null, ImmutableList.of());
+ /** Defines an operand tree for the given classes. */
+ default UnionConverterRule.Config withOperandFor(Class<? extends LogicalUnion> union) {
+ return withOperandSupplier(
+ o0 -> o0.operand(union).anyInputs()
+ )
+ .as(UnionConverterRule.Config.class);
+ }
- return res;
+ /** {@inheritDoc} */
+ @Override default UnionConverterRule toRule() {
+ return new UnionConverterRule(this);
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 6cd77f1..04094fd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -51,7 +49,6 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ControlFlowException;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
@@ -417,42 +414,6 @@ public class TraitUtils {
}
/**
- * Creates collations list with all permutation of specified keys.
- *
- * @param keys The keys to create collation from.
- * @return New collation.
- */
- public static List<RelCollation> permute(List<Integer> keys) {
- keys = new ArrayList<>(keys);
-
- List<RelCollation> res = new ArrayList<>();
-
- int[] indexes = new int[keys.size()];
- Arrays.fill(indexes, 0);
-
- res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
-
- int i = 0;
-
- while (i < keys.size()) {
- if (indexes[i] < i) {
- Collections.swap(keys, i % 2 == 0 ? 0 : indexes[i], i);
-
- res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
-
- indexes[i]++;
- i = 0;
- }
- else {
- indexes[i] = 0;
- i++;
- }
- }
-
- return res;
- }
-
- /**
* @param elem Elem.
*/
private static <T> List<T> singletonListFromNullable(@Nullable T elem) {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
index 5f99aa3..40752d6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregateDistinctPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregateDistinctPlannerTest.java
new file mode 100644
index 0000000..8bef6fb
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregateDistinctPlannerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class AggregateDistinctPlannerTest extends AbstractAggregatePlannerTest {
+ /** Algorithm. */
+ @Parameterized.Parameter
+ public AggregateAlgorithm algo;
+
+ /** */
+ @Parameterized.Parameters(name = "Algorithm = {0}")
+ public static List<Object[]> parameters() {
+ return Stream.of(AggregateAlgorithm.values()).map(a -> new Object[]{a}).collect(Collectors.toList());
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void mapReduceDistinctWithIndex() throws Exception {
+ TestTable tbl = createAffinityTable().addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT DISTINCT val0, val1 FROM test";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ algo.rulesToDisable
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+ IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+ Assert.assertTrue(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.isEmpty(rdcAgg.getAggregateCalls()));
+
+ Assert.assertTrue(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ F.isEmpty(mapAgg.getAggCallList()));
+
+ if (algo == AggregateAlgorithm.SORT)
+ assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+ }
+
+ /** */
+ enum AggregateAlgorithm {
+ /** */
+ SORT(
+ IgniteSingleSortAggregate.class,
+ IgniteMapSortAggregate.class,
+ IgniteReduceSortAggregate.class,
+ "HashSingleAggregateConverterRule",
+ "HashMapReduceAggregateConverterRule",
+ "SortSingleAggregateConverterRule"
+ ),
+
+ /**
+ *
+ */
+ HASH(
+ IgniteSingleHashAggregate.class,
+ IgniteMapHashAggregate.class,
+ IgniteReduceHashAggregate.class,
+ "SortSingleAggregateConverterRule",
+ "SortMapReduceAggregateConverterRule",
+ "HashSingleAggregateConverterRule"
+ );
+
+ /** */
+ public final Class<? extends IgniteSingleAggregateBase> single;
+
+ /** */
+ public final Class<? extends IgniteMapAggregateBase> map;
+
+ /** */
+ public final Class<? extends IgniteReduceAggregateBase> reduce;
+
+ /** */
+ public final String[] rulesToDisable;
+
+ /** */
+ AggregateAlgorithm(
+ Class<? extends IgniteSingleAggregateBase> single,
+ Class<? extends IgniteMapAggregateBase> map,
+ Class<? extends IgniteReduceAggregateBase> reduce,
+ String... rulesToDisable) {
+ this.single = single;
+ this.map = map;
+ this.reduce = reduce;
+ this.rulesToDisable = rulesToDisable;
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
index 3ba52ba..1d36838 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
@@ -26,25 +26,22 @@ import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.fun.SqlAvgAggFunction;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregateBase;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregateBase;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.apache.ignite.internal.util.typedef.F;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -52,7 +49,6 @@ import org.junit.runners.Parameterized;
/**
*
*/
-@SuppressWarnings({"TypeMayBeWeakened"})
@RunWith(Parameterized.class)
public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
/** Algorithm. */
@@ -81,12 +77,12 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- algo.ruleToDisable
+ algo.rulesToDisable
);
checkSplitAndSerialization(phys, publicSchema);
- IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+ IgniteSingleAggregateBase agg = findFirstNode(phys, byClass(algo.single));
assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
@@ -104,8 +100,6 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
*/
@Test
public void singleWithIndex() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
TestTable tbl = createBroadcastTable().addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_grp1");
IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
@@ -117,12 +111,12 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- algo.ruleToDisable
+ algo.rulesToDisable
);
checkSplitAndSerialization(phys, publicSchema);
- IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+ IgniteSingleAggregateBase agg = findFirstNode(phys, byClass(algo.single));
assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
@@ -151,20 +145,20 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- algo.ruleToDisable
+ algo.rulesToDisable
);
checkSplitAndSerialization(phys, publicSchema);
- IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+ IgniteMapAggregateBase mapAgg = findFirstNode(phys, byClass(algo.map));
IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
- assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), rdcAgg);
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
Assert.assertThat(
"Invalid plan\n" + RelOptUtil.toString(phys),
- F.first(rdcAgg.aggregateCalls()).getAggregation(),
+ F.first(rdcAgg.getAggregateCalls()).getAggregation(),
IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
Assert.assertThat(
@@ -176,89 +170,46 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
}
- /**
- *
- * @throws Exception If failed.
- */
- @Test
- @Ignore("Single aggregates must be disabled by hint: https://issues.apache.org/jira/browse/IGNITE-14274")
- public void mapReduceDistinctWithIndex() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- TestTable tbl = createAffinityTable().addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
-
- IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
- publicSchema.addTable("TEST", tbl);
-
- String sql = "SELECT DISTINCT val0, val1 FROM test";
-
- IgniteRel phys = physicalPlan(
- sql,
- publicSchema,
- algo.ruleToDisable
- );
-
- checkSplitAndSerialization(phys, publicSchema);
-
- IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
- IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
-
- assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
- assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
-
- Assert.assertTrue(
- "Invalid plan\n" + RelOptUtil.toString(phys),
- F.isEmpty(rdcAgg.aggregateCalls()));
-
- Assert.assertTrue(
- "Invalid plan\n" + RelOptUtil.toString(phys),
- F.isEmpty(mapAgg.getAggCallList()));
-
- if (algo == AggregateAlgorithm.SORT)
- assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
- }
-
/** */
enum AggregateAlgorithm {
/** */
SORT(
- IgniteSortAggregate.class,
+ IgniteSingleSortAggregate.class,
IgniteMapSortAggregate.class,
IgniteReduceSortAggregate.class,
- "HashAggregateConverterRule"
+ "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
),
/** */
HASH(
- IgniteHashAggregate.class,
+ IgniteSingleHashAggregate.class,
IgniteMapHashAggregate.class,
IgniteReduceHashAggregate.class,
- "SortAggregateConverterRule"
+ "SortSingleAggregateConverterRule", "SortMapReduceAggregateConverterRule"
);
/** */
- public final Class<? extends IgniteAggregateBase> single;
+ public final Class<? extends IgniteSingleAggregateBase> single;
/** */
- public final Class<? extends IgniteAggregate> map;
+ public final Class<? extends IgniteMapAggregateBase> map;
/** */
public final Class<? extends IgniteReduceAggregateBase> reduce;
/** */
- public final String ruleToDisable;
+ public final String[] rulesToDisable;
/** */
AggregateAlgorithm(
- Class<? extends IgniteAggregateBase> single,
- Class<? extends IgniteAggregate> map,
+ Class<? extends IgniteSingleAggregateBase> single,
+ Class<? extends IgniteMapAggregateBase> map,
Class<? extends IgniteReduceAggregateBase> reduce,
- String ruleToDisable) {
+ String... rulesToDisable) {
this.single = single;
this.map = map;
this.reduce = reduce;
- this.ruleToDisable = ruleToDisable;
+ this.rulesToDisable = rulesToDisable;
}
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java
index e7596a4..6cdbe2f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java
@@ -27,9 +27,9 @@ import org.apache.calcite.sql.fun.SqlCountAggFunction;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -95,7 +95,7 @@ public class HashAggregatePlannerTest extends AbstractAggregatePlannerTest {
Assert.assertThat(
"Invalid plan\n" + RelOptUtil.toString(phys),
- F.first(rdcAgg.aggregateCalls()).getAggregation(),
+ F.first(rdcAgg.getAggregateCalls()).getAggregation(),
IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
Assert.assertThat(
@@ -132,7 +132,7 @@ public class HashAggregatePlannerTest extends AbstractAggregatePlannerTest {
Assert.assertThat(
"Invalid plan\n" + RelOptUtil.toString(phys),
- F.first(rdcAgg.aggregateCalls()).getAggregation(),
+ F.first(rdcAgg.getAggregateCalls()).getAggregation(),
IsInstanceOf.instanceOf(SqlCountAggFunction.class));
Assert.assertThat(
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 08a2595..42787033 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -339,294 +339,6 @@ public class PlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testUnion() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- TestTable tbl1 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("NAME", f.createJavaType(String.class))
- .add("SALARY", f.createJavaType(Double.class))
- .build()) {
-
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
- return ColocationGroup.forAssignments(Arrays.asList(
- select(nodes, 0, 1),
- select(nodes, 1, 2),
- select(nodes, 2, 0),
- select(nodes, 0, 1),
- select(nodes, 1, 2)
- ));
- }
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "Table1", "hash");
- }
- };
-
- TestTable tbl2 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("NAME", f.createJavaType(String.class))
- .add("SALARY", f.createJavaType(Double.class))
- .build()) {
-
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
- return ColocationGroup.forAssignments(Arrays.asList(
- select(nodes, 0, 1),
- select(nodes, 1, 2),
- select(nodes, 2, 0),
- select(nodes, 0, 1),
- select(nodes, 1, 2)
- ));
- }
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "Table2", "hash");
- }
- };
-
- TestTable tbl3 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("NAME", f.createJavaType(String.class))
- .add("SALARY", f.createJavaType(Double.class))
- .build()) {
-
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
- return ColocationGroup.forAssignments(Arrays.asList(
- select(nodes, 0, 1),
- select(nodes, 1, 2),
- select(nodes, 2, 0),
- select(nodes, 0, 1),
- select(nodes, 1, 2)
- ));
- }
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "Table3", "hash");
- }
- };
-
- IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
- publicSchema.addTable("TABLE1", tbl1);
- publicSchema.addTable("TABLE2", tbl2);
- publicSchema.addTable("TABLE3", tbl3);
-
- SchemaPlus schema = createRootSchema(false)
- .add("PUBLIC", publicSchema);
-
- String sql = "" +
- "SELECT * FROM table1 " +
- "UNION " +
- "SELECT * FROM table2 " +
- "UNION " +
- "SELECT * FROM table3 ";
-
- RelTraitDef<?>[] traitDefs = {
- DistributionTraitDef.INSTANCE,
- ConventionTraitDef.INSTANCE,
- RelCollationTraitDef.INSTANCE,
- RewindabilityTraitDef.INSTANCE,
- CorrelationTraitDef.INSTANCE
- };
-
- PlanningContext ctx = PlanningContext.builder()
- .localNodeId(F.first(nodes))
- .originatingNodeId(F.first(nodes))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .traitDefs(traitDefs)
- .build())
- .logger(log)
- .query(sql)
- .parameters(2)
- .topologyVersion(AffinityTopologyVersion.NONE)
- .build();
-
- RelNode root;
-
- try (IgnitePlanner planner = ctx.planner()) {
- assertNotNull(planner);
-
- String qry = ctx.query();
-
- assertNotNull(qry);
-
- // Parse
- SqlNode sqlNode = planner.parse(qry);
-
- // Validate
- sqlNode = planner.validate(sqlNode);
-
- // Convert to Relational operators graph
- root = planner.convert(sqlNode);
-
- // Transformation chain
- root = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, root.getTraitSet(), root);
-
- // Transformation chain
- RelTraitSet desired = root.getCluster().traitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
- .simplify();
-
- root = planner.transform(PlannerPhase.OPTIMIZATION, desired, root);
- }
-
- assertNotNull(root);
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testUnionAll() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- TestTable tbl1 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("NAME", f.createJavaType(String.class))
- .add("SALARY", f.createJavaType(Double.class))
- .build()) {
-
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
- return ColocationGroup.forAssignments(Arrays.asList(
- select(nodes, 0, 1),
- select(nodes, 1, 2),
- select(nodes, 2, 0),
- select(nodes, 0, 1),
- select(nodes, 1, 2)
- ));
- }
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "Table1", "hash");
- }
- };
-
- TestTable tbl2 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("NAME", f.createJavaType(String.class))
- .add("SALARY", f.createJavaType(Double.class))
- .build()) {
-
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
- return ColocationGroup.forAssignments(Arrays.asList(
- select(nodes, 0, 1),
- select(nodes, 1, 2),
- select(nodes, 2, 0),
- select(nodes, 0, 1),
- select(nodes, 1, 2)
- ));
- }
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "Table2", "hash");
- }
- };
-
- TestTable tbl3 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("NAME", f.createJavaType(String.class))
- .add("SALARY", f.createJavaType(Double.class))
- .build()) {
-
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
- return ColocationGroup.forAssignments(Arrays.asList(
- select(nodes, 0, 1),
- select(nodes, 1, 2),
- select(nodes, 2, 0),
- select(nodes, 0, 1),
- select(nodes, 1, 2)
- ));
- }
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "Table3", "hash");
- }
- };
-
- IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
- publicSchema.addTable("TABLE1", tbl1);
- publicSchema.addTable("TABLE2", tbl2);
- publicSchema.addTable("TABLE3", tbl3);
-
- SchemaPlus schema = createRootSchema(false)
- .add("PUBLIC", publicSchema);
-
- String sql = "" +
- "SELECT * FROM table1 " +
- "UNION ALL " +
- "SELECT * FROM table2 " +
- "UNION ALL " +
- "SELECT * FROM table3 ";
-
- RelTraitDef<?>[] traitDefs = {
- DistributionTraitDef.INSTANCE,
- ConventionTraitDef.INSTANCE,
- RelCollationTraitDef.INSTANCE,
- RewindabilityTraitDef.INSTANCE,
- CorrelationTraitDef.INSTANCE
- };
-
- PlanningContext ctx = PlanningContext.builder()
- .localNodeId(F.first(nodes))
- .originatingNodeId(F.first(nodes))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .traitDefs(traitDefs)
- .build())
- .logger(log)
- .query(sql)
- .parameters(2)
- .topologyVersion(AffinityTopologyVersion.NONE)
- .build();
-
- RelNode root;
-
- try (IgnitePlanner planner = ctx.planner()) {
- assertNotNull(planner);
-
- String qry = ctx.query();
-
- assertNotNull(qry);
-
- // Parse
- SqlNode sqlNode = planner.parse(qry);
-
- // Validate
- sqlNode = planner.validate(sqlNode);
-
- // Convert to Relational operators graph
- root = planner.convert(sqlNode);
-
- // Transformation chain
- root = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, root.getTraitSet(), root);
-
- // Transformation chain
- RelTraitSet desired = root.getCluster().traitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
- .simplify();
-
- root = planner.transform(PlannerPhase.OPTIMIZATION, desired, root);
- }
-
- assertNotNull(root);
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
public void testHepPlaner() throws Exception {
IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java
index d5bd8a6..88f9cdb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java
@@ -17,10 +17,24 @@
package org.apache.ignite.internal.processors.query.calcite.planner;
+import java.util.Arrays;
+
import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
@@ -46,10 +60,108 @@ public class SortAggregatePlannerTest extends AbstractAggregatePlannerTest {
() -> physicalPlan(
sqlMin,
publicSchema,
- "HashAggregateConverterRule"
+ "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
),
RelOptPlanner.CannotPlanException.class,
"There are not enough rules to produce a node with desired properties"
);
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void collationPermuteSingle() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL0", f.createJavaType(Integer.class))
+ .add("VAL1", f.createJavaType(Integer.class))
+ .add("GRP0", f.createJavaType(Integer.class))
+ .add("GRP1", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_1");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT MIN(val0) FROM test GROUP BY grp1, grp0";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
+ );
+
+ IgniteSingleSortAggregate agg = findFirstNode(phys, byClass(IgniteSingleSortAggregate.class));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+ assertNull(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ findFirstNode(phys, byClass(IgniteSort.class))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void collationPermuteMapReduce() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL0", f.createJavaType(Integer.class))
+ .add("VAL1", f.createJavaType(Integer.class))
+ .add("GRP0", f.createJavaType(Integer.class))
+ .add("GRP1", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "test", "hash");
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_1");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT MIN(val0) FROM test GROUP BY grp1, grp0";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
+ );
+
+ IgniteReduceSortAggregate agg = findFirstNode(phys, byClass(IgniteReduceSortAggregate.class));
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+ assertNull(
+ "Invalid plan\n" + RelOptUtil.toString(phys),
+ findFirstNode(phys, byClass(IgniteSort.class))
+ );
+ }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/UnionPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/UnionPlannerTest.java
new file mode 100644
index 0000000..8847904
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/UnionPlannerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+
+/**
+ *
+ */
+//@WithSystemProperty(key = "calcite.debug", value = "true")
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class UnionPlannerTest extends AbstractPlannerTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUnion() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl1 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Table1", "hash");
+ }
+ };
+
+ TestTable tbl2 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Table2", "hash");
+ }
+ };
+
+ TestTable tbl3 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Table3", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TABLE1", tbl1);
+ publicSchema.addTable("TABLE2", tbl2);
+ publicSchema.addTable("TABLE3", tbl3);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "" +
+ "SELECT * FROM table1 " +
+ "UNION " +
+ "SELECT * FROM table2 " +
+ "UNION " +
+ "SELECT * FROM table3 ";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema
+ );
+
+ System.out.println("+++ " + RelOptUtil.toString(phys));
+
+ assertNotNull(phys);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUnionAll() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable tbl1 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Table1", "hash");
+ }
+ };
+
+ TestTable tbl2 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Table2", "hash");
+ }
+ };
+
+ TestTable tbl3 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build()) {
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Table3", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TABLE1", tbl1);
+ publicSchema.addTable("TABLE2", tbl2);
+ publicSchema.addTable("TABLE3", tbl3);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "" +
+ "SELECT * FROM table1 " +
+ "UNION ALL " +
+ "SELECT * FROM table2 " +
+ "UNION ALL " +
+ "SELECT * FROM table3 ";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema
+ );
+
+ System.out.println("+++ " + RelOptUtil.toString(phys));
+
+ assertNotNull(phys);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index c6cb5ba..50c715f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -17,6 +17,7 @@
package org.apache.ignite.testsuites;
+import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
@@ -38,6 +39,7 @@ import org.junit.runners.Suite;
TableSpoolPlannerTest.class,
IndexSpoolPlannerTest.class,
AggregatePlannerTest.class,
+ AggregateDistinctPlannerTest.class,
HashAggregatePlannerTest.class,
SortAggregatePlannerTest.class,
JoinColocationPlannerTest.class,