You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/19 09:52:39 UTC

[ignite] branch sql-calcite updated: IGNITE-14274 Calcite: refactoring aggregates converter rules: single and map/reduce

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 32b9fac  IGNITE-14274 Calcite: refactoring aggregates converter rules: single and map/reduce
32b9fac is described below

commit 32b9facdd27960d53b7bcedec04415566eabb875
Author: tledkov <tl...@gridgain.com>
AuthorDate: Fri Mar 19 12:52:22 2021 +0300

    IGNITE-14274 Calcite: refactoring aggregates converter rules: single and map/reduce
---
 .../query/calcite/exec/LogicalRelImplementor.java  |  24 +-
 .../query/calcite/exec/rel/SortAggregateNode.java  |   2 +
 .../query/calcite/externalize/RelJson.java         |   1 +
 .../processors/query/calcite/prepare/Cloner.java   |  16 +-
 .../query/calcite/prepare/IgniteRelShuttle.java    |  16 +-
 .../query/calcite/prepare/PlannerPhase.java        |   8 +-
 .../query/calcite/rel/IgniteAggregateBase.java     | 287 --------------------
 .../query/calcite/rel/IgniteHashAggregate.java     |  98 -------
 .../query/calcite/rel/IgniteRelVisitor.java        |  11 +-
 .../calcite/rel/agg/IgniteHashAggregateBase.java   |  46 ++++
 .../calcite/rel/agg/IgniteMapAggregateBase.java    |  86 ++++++
 .../rel/{ => agg}/IgniteMapHashAggregate.java      |   6 +-
 .../rel/{ => agg}/IgniteMapSortAggregate.java      |  17 +-
 .../rel/{ => agg}/IgniteReduceAggregateBase.java   |  49 +++-
 .../rel/{ => agg}/IgniteReduceHashAggregate.java   |  16 +-
 .../rel/{ => agg}/IgniteReduceSortAggregate.java   |  11 +-
 .../calcite/rel/agg/IgniteSingleAggregateBase.java |  86 ++++++
 .../IgniteSingleHashAggregate.java}                |  49 +---
 .../IgniteSingleSortAggregate.java}                |  62 +----
 .../calcite/rel/agg/IgniteSortAggregateBase.java   |  71 +++++
 .../calcite/rule/HashAggregateConverterRule.java   |  84 +++++-
 .../calcite/rule/SortAggregateConverterRule.java   | 124 +++++++--
 .../query/calcite/rule/UnionConverterRule.java     |  68 +++--
 .../processors/query/calcite/trait/TraitUtils.java |  39 ---
 .../rel/HashAggregateSingleGroupExecutionTest.java |   2 +-
 .../planner/AggregateDistinctPlannerTest.java      | 150 +++++++++++
 .../calcite/planner/AggregatePlannerTest.java      | 105 ++------
 .../calcite/planner/HashAggregatePlannerTest.java  |   8 +-
 .../query/calcite/planner/PlannerTest.java         | 288 ---------------------
 .../calcite/planner/SortAggregatePlannerTest.java  | 114 +++++++-
 .../query/calcite/planner/UnionPlannerTest.java    | 240 +++++++++++++++++
 .../apache/ignite/testsuites/PlannerTestSuite.java |   2 +
 32 files changed, 1186 insertions(+), 1000 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 0d21530..0f891bb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -59,29 +59,29 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
@@ -438,7 +438,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteHashAggregate rel) {
+    @Override public Node<Row> visit(IgniteSingleHashAggregate rel) {
         AggregateType type = AggregateType.SINGLE;
 
         RelDataType rowType = rel.getRowType();
@@ -484,10 +484,10 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
         RelDataType rowType = rel.getRowType();
 
         Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
-            type, rel.aggregateCalls(), null);
+            type, rel.getAggregateCalls(), null);
         RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
-        HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.groupSets(), accFactory, rowFactory);
+        HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -497,7 +497,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteSortAggregate rel) {
+    @Override public Node<Row> visit(IgniteSingleSortAggregate rel) {
         AggregateType type = AggregateType.SINGLE;
 
         RelDataType rowType = rel.getRowType();
@@ -568,7 +568,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
 
         Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
             type,
-            rel.aggregateCalls(),
+            rel.getAggregateCalls(),
             null
         );
 
@@ -578,7 +578,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
             ctx,
             rowType,
             type,
-            rel.groupSet(),
+            rel.getGroupSet(),
             accFactory,
             rowFactory,
             expressionFactory.comparator(rel.collation())
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
index af63e34..eceb620 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
@@ -21,6 +21,7 @@ import java.util.ArrayDeque;
 import java.util.Comparator;
 import java.util.Deque;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.Supplier;
 
 import org.apache.calcite.rel.type.RelDataType;
@@ -84,6 +85,7 @@ public class SortAggregateNode<Row> extends AbstractNode<Row> implements SingleN
         Comparator<Row> comp
     ) {
         super(ctx, rowType);
+        assert Objects.nonNull(comp);
 
         this.type = type;
         this.accFactory = accFactory;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index af0d2cb..d454b9a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -217,6 +217,7 @@ class RelJson {
     private static final List<String> PACKAGES =
         ImmutableList.of(
             "org.apache.ignite.internal.processors.query.calcite.rel.",
+            "org.apache.ignite.internal.processors.query.calcite.rel.agg.",
             "org.apache.calcite.rel.",
             "org.apache.calcite.rel.core.",
             "org.apache.calcite.rel.logical.",
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 2069945..1b2d530 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -22,29 +22,29 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -190,7 +190,7 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteHashAggregate rel) {
+    @Override public IgniteRel visit(IgniteSingleHashAggregate rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
@@ -205,7 +205,7 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSortAggregate rel) {
+    @Override public IgniteRel visit(IgniteSingleSortAggregate rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 7e232f29..5f05cc6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -22,29 +22,29 @@ import java.util.List;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /** */
@@ -90,7 +90,7 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteHashAggregate rel) {
+    @Override public IgniteRel visit(IgniteSingleHashAggregate rel) {
         return processNode(rel);
     }
 
@@ -105,7 +105,7 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSortAggregate rel) {
+    @Override public IgniteRel visit(IgniteSingleSortAggregate rel) {
         return processNode(rel);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 6f21db7..e108efa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -163,8 +163,12 @@ public enum PlannerPhase {
                     ValuesConverterRule.INSTANCE,
                     LogicalScanConverterRule.INDEX_SCAN,
                     LogicalScanConverterRule.TABLE_SCAN,
-                    HashAggregateConverterRule.INSTANCE,
-                    SortAggregateConverterRule.INSTANCE,
+                    HashAggregateConverterRule.SINGLE,
+                    HashAggregateConverterRule.MAP_REDUCE,
+                    SortAggregateConverterRule.SINGLE,
+                    SortAggregateConverterRule.MAP_REDUCE,
+                    MergeJoinConverterRule.INSTANCE,
+                    NestedLoopJoinConverterRule.INSTANCE,
                     ProjectConverterRule.INSTANCE,
                     FilterConverterRule.INSTANCE,
                     TableModifyConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
deleted file mode 100644
index 32ef760..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
-
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
-/**
- *
- */
-public abstract class IgniteAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
-    /** {@inheritDoc} */
-    protected IgniteAggregateBase(
-        RelOptCluster cluster,
-        RelTraitSet traitSet,
-        RelNode input,
-        ImmutableBitSet groupSet,
-        List<ImmutableBitSet> groupSets,
-        List<AggregateCall> aggCalls
-    ) {
-        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    protected IgniteAggregateBase(RelInput input) {
-        super(changeTraits(input, IgniteConvention.INSTANCE));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
-        //    and all of input distribution keys are parts of aggregation group and vice versa.
-        // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                return Pair.of(nodeTraits, ImmutableList.of(in.replace(distribution)));
-
-            case RANDOM_DISTRIBUTED:
-                if (!groupSet.isEmpty() && isSimple(this)) {
-                    IgniteDistribution outDistr = hash(range(0, groupSet.cardinality()));
-                    IgniteDistribution inDistr = hash(groupSet.asList());
-
-                    return Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in.replace(inDistr)));
-                }
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                ImmutableIntList keys = distribution.getKeys();
-
-                if (isSimple(this) && groupSet.cardinality() == keys.size()) {
-                    Mappings.TargetMapping mapping = Commons.inverseMapping(
-                        groupSet, getInput().getRowType().getFieldCount());
-
-                    List<Integer> srcKeys = new ArrayList<>(keys.size());
-
-                    for (int key : keys) {
-                        int src = mapping.getSourceOpt(key);
-
-                        if (src == -1)
-                            break;
-
-                        srcKeys.add(src);
-                    }
-
-                    if (srcKeys.size() == keys.size())
-                        return Pair.of(nodeTraits, ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        return Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single())));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        // Aggregate is rewindable if its input is rewindable.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
-            ? RewindabilityTrait.ONE_WAY
-            : TraitUtils.rewindability(in);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), ImmutableList.of(in.replace(rewindability))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
-        //    and all of input distribution keys are parts aggregation group.
-        // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        IgniteDistribution distribution = TraitUtils.distribution(in);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                res.add(Pair.of(nodeTraits.replace(distribution), ImmutableList.of(in)));
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                if (isSimple(this)) {
-                    ImmutableIntList keys = distribution.getKeys();
-
-                    //Check that group by contains all key columns
-                    if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = Commons.mapping(
-                            groupSet, getInput().getRowType().getFieldCount());
-
-                        IgniteDistribution outDistr = distribution.apply(mapping);
-
-                        if (outDistr.getType() == HASH_DISTRIBUTED)
-                            res.add(Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in)));
-                    }
-
-                    //Map-reduce aggregates especial for non 'group by' query, like a select count(*) from table
-                    if (groupSet.isEmpty())
-                        res.add(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(hash(keys, distribution.function())))));
-                }
-
-                break;
-
-            case RANDOM_DISTRIBUTED:
-                // Map-reduce aggregates
-                if (isSimple(this)) {
-                    res.add(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(random()))));
-                    res.add(Pair.of(nodeTraits.replace(broadcast()), ImmutableList.of(in.replace(random()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single()))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits
-    ) {
-        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
-    @Override public @NotNull RelNode createNode(RelTraitSet outTraits, List<RelTraitSet> inTraits) {
-        RelTraitSet in = inTraits.get(0);
-
-        if (!isMapReduce(outTraits, in))
-            return copy(outTraits, ImmutableList.of(convert(getInput(), in)));
-
-        if (U.assertionsEnabled()) {
-            ImmutableList<RelTrait> diff = in.difference(outTraits);
-
-            assert diff.size() == 1 && F.first(diff) == TraitUtils.distribution(outTraits);
-        }
-
-        RelNode map = createMapAggregate(
-            getCluster(),
-            in,
-            convert(getInput(), in),
-            groupSet,
-            groupSets,
-            aggCalls);
-
-        return createReduceAggregate(
-            getCluster(),
-            outTraits,
-            convert(map, outTraits),
-            groupSet,
-            groupSets,
-            aggCalls,
-            getRowType());
-    }
-
-    /** */
-    protected abstract RelNode createReduceAggregate(
-        RelOptCluster cluster,
-        RelTraitSet traits,
-        RelNode input,
-        ImmutableBitSet groupSet,
-        ImmutableList<ImmutableBitSet> groupSets,
-        List<AggregateCall> aggCalls,
-        RelDataType rowType
-    );
-
-    /** */
-    protected abstract RelNode createMapAggregate(
-        RelOptCluster cluster,
-        RelTraitSet traits,
-        RelNode input,
-        ImmutableBitSet groupSet,
-        ImmutableList<ImmutableBitSet> groupSets,
-        List<AggregateCall> aggCalls
-    );
-
-    /** */
-    private boolean isMapReduce(RelTraitSet out, RelTraitSet in) {
-        return TraitUtils.distribution(out).satisfies(single())
-            && TraitUtils.distribution(in).satisfies(random());
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
deleted file mode 100644
index a0f4613..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
-
-/**
- *
- */
-public class IgniteHashAggregate extends IgniteAggregateBase {
-    /** {@inheritDoc} */
-    public IgniteHashAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    public IgniteHashAggregate(RelInput input) {
-        super(input);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteHashAggregate(cluster, getTraitSet(), sole(inputs),
-            getGroupSet(), getGroupSets(), getAggCallList());
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
-    }
-
-    /** {@inheritDoc} */
-    @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteMapHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
-        RelDataType rowType) {
-        return new IgniteReduceHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, getRowType());
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        return computeSelfCostHash(planner, mq);
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 8c620a6..e47dbf6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -17,6 +17,13 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+
 /**
  * A visitor to traverse an Ignite relational nodes tree.
  */
@@ -79,7 +86,7 @@ public interface IgniteRelVisitor<T> {
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
-    T visit(IgniteHashAggregate rel);
+    T visit(IgniteSingleHashAggregate rel);
 
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
@@ -94,7 +101,7 @@ public interface IgniteRelVisitor<T> {
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
-    T visit(IgniteSortAggregate rel);
+    T visit(IgniteSingleSortAggregate rel);
 
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteHashAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteHashAggregateBase.java
new file mode 100644
index 0000000..d4b1f44
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteHashAggregateBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+/**
+ *
+ */
+interface IgniteHashAggregateBase extends TraitsAwareIgniteRel {
+    /** {@inheritDoc} */
+    @Override default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        // Since it's a hash aggregate it erases collation.
+        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
+    }
+
+    /** {@inheritDoc} */
+    @Override default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        // Since it's a hash aggregate it erases collation.
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
+    }
+
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapAggregateBase.java
new file mode 100644
index 0000000..7d852532
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapAggregateBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+/**
+ *
+ */
+public abstract class IgniteMapAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
+    /** {@inheritDoc} */
+    protected IgniteMapAggregateBase(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
+        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    protected IgniteMapAggregateBase(RelInput input) {
+        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits, ImmutableList.of(inputTraits.get(0))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        RelTraitSet in = inputTraits.get(0);
+
+        if (TraitUtils.distribution(in).satisfies(IgniteDistributions.single()))
+            return ImmutableList.of();
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.distribution(in)), ImmutableList.of(in)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+            inTraits));
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapHashAggregate.java
similarity index 91%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapHashAggregate.java
index 49bdf9e..c6febb8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapHashAggregate.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
 
@@ -33,13 +33,15 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
  */
-public class IgniteMapHashAggregate extends IgniteAggregate {
+public class IgniteMapHashAggregate extends IgniteMapAggregateBase implements IgniteHashAggregateBase {
     /** */
     public IgniteMapHashAggregate(
         RelOptCluster cluster,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapSortAggregate.java
similarity index 84%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapSortAggregate.java
index 171b747..53a78d8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapSortAggregate.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
 import java.util.Objects;
@@ -36,14 +36,16 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
 /**
  *
  */
-public class IgniteMapSortAggregate extends IgniteAggregate {
+public class IgniteMapSortAggregate extends IgniteMapAggregateBase implements IgniteSortAggregateBase {
     /** Collation. */
     private final RelCollation collation;
 
@@ -67,7 +69,7 @@ public class IgniteMapSortAggregate extends IgniteAggregate {
 
     /** */
     public IgniteMapSortAggregate(RelInput input) {
-        super(changeTraits(input, IgniteConvention.INSTANCE));
+        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
 
         collation = input.getCollation();
 
@@ -82,14 +84,15 @@ public class IgniteMapSortAggregate extends IgniteAggregate {
         ImmutableBitSet groupSet,
         List<ImmutableBitSet> groupSets,
         List<AggregateCall> aggCalls) {
-        return new IgniteMapSortAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
+        return new IgniteMapSortAggregate(
+            getCluster(), traitSet, input, groupSet, groupSets, aggCalls, TraitUtils.collation(traitSet));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteMapSortAggregate(
             cluster,
-            getTraitSet(),
+            getTraitSet().replace(collation),
             sole(inputs),
             getGroupSet(),
             getGroupSets(),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
similarity index 66%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
index cfab999..0c3f171 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
 
@@ -32,12 +32,18 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
 
 /**
  *
  */
-public abstract class IgniteReduceAggregateBase extends SingleRel implements IgniteRel {
+public abstract class IgniteReduceAggregateBase extends SingleRel implements TraitsAwareIgniteRel {
     /** */
     protected final ImmutableBitSet groupSet;
 
@@ -102,17 +108,50 @@ public abstract class IgniteReduceAggregateBase extends SingleRel implements Ign
     }
 
     /** */
-    public ImmutableBitSet groupSet() {
+    public ImmutableBitSet getGroupSet() {
         return groupSet;
     }
 
     /** */
-    public List<ImmutableBitSet> groupSets() {
+    public List<ImmutableBitSet> getGroupSets() {
         return groupSets;
     }
 
     /** */
-    public List<AggregateCall> aggregateCalls() {
+    public List<AggregateCall> getAggregateCalls() {
         return aggCalls;
     }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        return ImmutableList.of(
+            Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        RelTraitSet in = inputTraits.get(0);
+
+        return ImmutableList.of(
+            Pair.of(
+                nodeTraits.replace(IgniteDistributions.single()),
+                ImmutableList.of(in.replace(IgniteDistributions.single()))
+            )
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+            inTraits));
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceHashAggregate.java
similarity index 83%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceHashAggregate.java
index dcda3ac..cc3741f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceHashAggregate.java
@@ -15,29 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
  */
-public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase {
+public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase implements IgniteHashAggregateBase {
     /** */
     public IgniteReduceHashAggregate(
         RelOptCluster cluster,
@@ -101,4 +106,11 @@ public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase {
             0
         );
     }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            ImmutableList.of(inTraits.get(0).replace(RelCollations.EMPTY))));
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceSortAggregate.java
similarity index 89%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceSortAggregate.java
index bb71bf1..83dd626 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceSortAggregate.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
 import java.util.Objects;
@@ -34,11 +34,14 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 
 /**
  *
  */
-public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
+public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase implements IgniteSortAggregateBase {
     /** Collation. */
     private final RelCollation collation;
 
@@ -80,7 +83,7 @@ public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
             groupSets,
             aggCalls,
             rowType,
-            collation
+            TraitUtils.collation(traitSet)
         );
     }
 
@@ -88,7 +91,7 @@ public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteReduceSortAggregate(
             cluster,
-            getTraitSet(),
+            getTraitSet().replace(collation),
             sole(inputs),
             groupSet,
             groupSets,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
new file mode 100644
index 0000000..348f4b6
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+/**
+ *
+ */
+public abstract class IgniteSingleAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
+    /** {@inheritDoc} */
+    protected IgniteSingleAggregateBase(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
+        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    protected IgniteSingleAggregateBase(RelInput input) {
+        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits, ImmutableList.of(inputTraits.get(0))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        RelTraitSet in = inputTraits.get(0);
+
+        if (!TraitUtils.distribution(in).satisfies(IgniteDistributions.single()))
+            return ImmutableList.of();
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), ImmutableList.of(in)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+            inTraits));
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleHashAggregate.java
similarity index 55%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleHashAggregate.java
index 49bdf9e..a965595 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleHashAggregate.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
 
@@ -28,43 +28,32 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 
 /**
  *
  */
-public class IgniteMapHashAggregate extends IgniteAggregate {
-    /** */
-    public IgniteMapHashAggregate(
-        RelOptCluster cluster,
-        RelTraitSet traitSet,
-        RelNode input,
-        ImmutableBitSet groupSet,
-        List<ImmutableBitSet> groupSets,
-        List<AggregateCall> aggCalls
-    ) {
+public class IgniteSingleHashAggregate extends IgniteSingleAggregateBase implements IgniteHashAggregateBase {
+    /** {@inheritDoc} */
+    public IgniteSingleHashAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
         super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
     }
 
-    /** */
-    public IgniteMapHashAggregate(RelInput input) {
+    /** {@inheritDoc} */
+    public IgniteSingleHashAggregate(RelInput input) {
         super(input);
     }
 
     /** {@inheritDoc} */
     @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteMapHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+        return new IgniteSingleHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteMapHashAggregate(cluster, getTraitSet(), sole(inputs),
+        return new IgniteSingleHashAggregate(cluster, getTraitSet(), sole(inputs),
             getGroupSet(), getGroupSets(), getAggCallList());
     }
 
@@ -74,24 +63,6 @@ public class IgniteMapHashAggregate extends IgniteAggregate {
     }
 
     /** {@inheritDoc} */
-    @Override protected RelDataType deriveRowType() {
-        return rowType(Commons.typeFactory(getCluster()));
-    }
-
-    /** */
-    public static RelDataType rowType(RelDataTypeFactory typeFactory) {
-        assert typeFactory instanceof IgniteTypeFactory;
-
-        RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
-
-        builder.add("GROUP_ID", typeFactory.createJavaType(byte.class));
-        builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
-        builder.add("AGG_DATA", typeFactory.createArrayType(typeFactory.createJavaType(Accumulator.class), -1));
-
-        return builder.build();
-    }
-
-    /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
         return computeSelfCostHash(planner, mq);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleSortAggregate.java
similarity index 55%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleSortAggregate.java
index 0a2e3e5..1171506 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleSortAggregate.java
@@ -15,40 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 
 /**
  *
  */
-public class IgniteSortAggregate extends IgniteAggregateBase {
+public class IgniteSingleSortAggregate extends IgniteSingleAggregateBase implements IgniteSortAggregateBase {
     /** Collation. */
     private final RelCollation collation;
 
     /** {@inheritDoc} */
-    public IgniteSortAggregate(
+    public IgniteSingleSortAggregate(
         RelOptCluster cluster,
         RelTraitSet traitSet,
         RelNode input,
@@ -65,7 +61,7 @@ public class IgniteSortAggregate extends IgniteAggregateBase {
     }
 
     /** {@inheritDoc} */
-    public IgniteSortAggregate(RelInput input) {
+    public IgniteSingleSortAggregate(RelInput input) {
         super(input);
 
         collation = input.getCollation();
@@ -76,12 +72,12 @@ public class IgniteSortAggregate extends IgniteAggregateBase {
 
     /** {@inheritDoc} */
     @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteSortAggregate(getCluster(), traitSet.replace(collation), input, groupSet, groupSets, aggCalls);
+        return new IgniteSingleSortAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteSortAggregate(cluster, getTraitSet(), sole(inputs),
+        return new IgniteSingleSortAggregate(cluster, getTraitSet().replace(collation), sole(inputs),
             getGroupSet(), getGroupSets(), getAggCallList());
     }
 
@@ -96,48 +92,6 @@ public class IgniteSortAggregate extends IgniteAggregateBase {
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
-        RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
-    ) {
-        RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(groupSet.asList()));
-
-        return Pair.of(nodeTraits.replace(collation),
-            ImmutableList.of(inputTraits.get(0).replace(collation)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
-        RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
-    ) {
-        RelCollation inputCollation = TraitUtils.collation(inputTraits.get(0));
-
-        List<RelCollation> satisfiedCollations = TraitUtils.permute(groupSet.asList()).stream()
-            .filter(col -> inputCollation.satisfies(col))
-            .collect(Collectors.toList());
-
-        if (satisfiedCollations.isEmpty())
-            return ImmutableList.of();
-
-        return satisfiedCollations.stream()
-            .map(col -> Pair.of(nodeTraits.replace(col), inputTraits))
-            .collect(Collectors.toList());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteMapSortAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, collation);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
-        RelDataType rowType) {
-        return new IgniteReduceSortAggregate(getCluster(), traits, input, groupSet, groupSets,
-            aggCalls, getRowType(), collation);
-    }
-
-    /** {@inheritDoc} */
     @Override public RelCollation collation() {
         assert collation.equals(super.collation());
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSortAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSortAggregateBase.java
new file mode 100644
index 0000000..8f50c4d
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSortAggregateBase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.agg;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.maxPrefix;
+
+/**
+ *
+ */
+interface IgniteSortAggregateBase extends TraitsAwareIgniteRel {
+    /**
+     * Returns a bit set of the grouping fields.
+     *
+     * @return bit set of ordinals of grouping fields
+     */
+    ImmutableBitSet getGroupSet();
+
+    /** {@inheritDoc} */
+    @Override default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
+        RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+    ) {
+        RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(getGroupSet().asList()));
+
+        return Pair.of(nodeTraits.replace(collation),
+            ImmutableList.of(inputTraits.get(0).replace(collation)));
+    }
+
+    /** {@inheritDoc} */
+    @Override default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
+        RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+    ) {
+        RelCollation inputCollation = TraitUtils.collation(inputTraits.get(0));
+
+        List<Integer> newCollation = maxPrefix(inputCollation.getKeys(), getGroupSet().asSet());
+
+        if (newCollation.size() < getGroupSet().cardinality())
+            return ImmutableList.of();
+
+        return ImmutableList.of(Pair.of(
+            nodeTraits.replace(RelCollations.of(ImmutableIntList.copyOf(newCollation))),
+            inputTraits
+        ));
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
index b1cf2e2a..4e86eb8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
@@ -26,29 +26,85 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 
 /**
  *
  */
-public class HashAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+public class HashAggregateConverterRule {
     /** */
-    public static final RelOptRule INSTANCE = new HashAggregateConverterRule();
+    public static final RelOptRule SINGLE = new HashSingleAggregateConverterRule();
 
     /** */
-    public HashAggregateConverterRule() {
-        super(LogicalAggregate.class, "HashAggregateConverterRule");
+    public static final RelOptRule MAP_REDUCE = new HashMapReduceAggregateConverterRule();
+
+    /** */
+    private HashAggregateConverterRule() {
+        // No-op.
+    }
+
+    /** */
+    private static class HashSingleAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+        /** */
+        HashSingleAggregateConverterRule() {
+            super(LogicalAggregate.class, "HashSingleAggregateConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+            LogicalAggregate agg) {
+            RelOptCluster cluster = agg.getCluster();
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+            RelNode input = convert(agg.getInput(), inTrait);
+
+            return new IgniteSingleHashAggregate(
+                cluster,
+                outTrait,
+                input,
+                agg.getGroupSet(),
+                agg.getGroupSets(),
+                agg.getAggCallList()
+            );
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
-        LogicalAggregate rel) {
-        RelOptCluster cluster = rel.getCluster();
-        RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        RelNode input = convert(rel.getInput(), inTrait);
+    /** */
+    private static class HashMapReduceAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+        /** */
+        HashMapReduceAggregateConverterRule() {
+            super(LogicalAggregate.class, "HashMapReduceAggregateConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+            LogicalAggregate agg) {
+            RelOptCluster cluster = agg.getCluster();
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+            RelNode input = convert(agg.getInput(), inTrait);
+
+            RelNode map = new IgniteMapHashAggregate(
+                cluster,
+                outTrait,
+                input,
+                agg.getGroupSet(),
+                agg.getGroupSets(),
+                agg.getAggCallList()
+            );
 
-        return new IgniteHashAggregate(cluster, outTrait, input,
-            rel.getGroupSet(), rel.getGroupSets(), rel.getAggCallList());
+            return new IgniteReduceHashAggregate(
+                cluster,
+                outTrait.replace(IgniteDistributions.single()),
+                convert(map, inTrait.replace(IgniteDistributions.single())),
+                agg.getGroupSet(),
+                agg.getGroupSets(),
+                agg.getAggCallList(),
+                agg.getRowType()
+            );
+        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
index a4dc10a..a8e304b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
@@ -29,40 +29,108 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.util.typedef.F;
 
-/** */
-public class SortAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+/**
+ *
+ */
+public class SortAggregateConverterRule {
+    /** */
+    public static final RelOptRule SINGLE = new SortSingleAggregateConverterRule();
+
+    /** */
+    public static final RelOptRule MAP_REDUCE = new SortMapReduceAggregateConverterRule();
+
     /** */
-    public static final RelOptRule INSTANCE = new SortAggregateConverterRule();
+    private SortAggregateConverterRule() {
+        // No-op.
+    }
 
     /** */
-    public SortAggregateConverterRule() {
-        super(LogicalAggregate.class, "SortAggregateConverterRule");
+    private static class SortSingleAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+        /** */
+        SortSingleAggregateConverterRule() {
+            super(LogicalAggregate.class, "SortSingleAggregateConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+            LogicalAggregate agg) {
+            // Applicable only for GROUP BY
+            if (F.isEmpty(agg.getGroupSet()) || agg.getGroupSets().size() > 1)
+                return null;
+
+            RelOptCluster cluster = agg.getCluster();
+            RelNode input = agg.getInput();
+
+            RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(agg.getGroupSet().asList()));
+
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE)
+                .replace(collation)
+                .replace(IgniteDistributions.single());
+
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE)
+                .replace(collation)
+                .replace(IgniteDistributions.single());
+
+            return new IgniteSingleSortAggregate(
+                cluster,
+                outTrait,
+                convert(input, inTrait),
+                agg.getGroupSet(),
+                agg.getGroupSets(),
+                agg.getAggCallList()
+            );
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
-        LogicalAggregate rel) {
-        // Applicable only for GROUP BY
-        if (F.isEmpty(rel.getGroupSet()) || rel.getGroupSets().size() > 1)
-            return null;
-
-        RelOptCluster cluster = rel.getCluster();
-        RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        RelNode input = rel.getInput();
-
-        RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(rel.getGroupSet().asList()));
-
-        return new IgniteSortAggregate(
-            cluster,
-            outTrait.replace(collation),
-            convert(input, inTrait.replace(collation)),
-            rel.getGroupSet(),
-            rel.getGroupSets(),
-            rel.getAggCallList()
-        );
+    /** */
+    private static class SortMapReduceAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+        /** */
+        SortMapReduceAggregateConverterRule() {
+            super(LogicalAggregate.class, "SortMapReduceAggregateConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
+            LogicalAggregate agg) {
+            // Applicable only for GROUP BY
+            if (F.isEmpty(agg.getGroupSet()) || agg.getGroupSets().size() > 1)
+                return null;
+
+            RelOptCluster cluster = agg.getCluster();
+            RelNode input = agg.getInput();
+
+            RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(agg.getGroupSet().asList()));
+
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE)
+                .replace(collation);
+
+            RelNode map = new IgniteMapSortAggregate(
+                cluster,
+                outTrait,
+                convert(input, inTrait),
+                agg.getGroupSet(),
+                agg.getGroupSets(),
+                agg.getAggCallList(),
+                collation
+            );
+
+            return new IgniteReduceSortAggregate(
+                cluster,
+                outTrait.replace(IgniteDistributions.single()),
+                convert(map, inTrait.replace(IgniteDistributions.single())),
+                agg.getGroupSet(),
+                agg.getGroupSets(),
+                agg.getAggCallList(),
+                agg.getRowType(),
+                collation
+            );
+        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
index cd85348..46a438c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
@@ -19,45 +19,77 @@ package org.apache.ignite.internal.processors.query.calcite.rule;
 
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.logical.LogicalUnion;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
  */
-public class UnionConverterRule extends AbstractIgniteConverterRule<LogicalUnion> {
-    /** */
-    public static final RelOptRule INSTANCE = new UnionConverterRule();
+public class UnionConverterRule extends RelRule<UnionConverterRule.Config> {
+    /** Instance. */
+    public static final RelOptRule INSTANCE = Config.DEFAULT.toRule();
 
     /** */
-    public UnionConverterRule() {
-        super(LogicalUnion.class, "UnionConverterRule");
+    public UnionConverterRule(Config cfg) {
+        super(cfg);
     }
 
     /** {@inheritDoc} */
-    @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalUnion rel) {
-        RelOptCluster cluster = rel.getCluster();
+    @Override public void onMatch(RelOptRuleCall call) {
+        final LogicalUnion union = call.rel(0);
+
+        RelOptCluster cluster = union.getCluster();
         RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        List<RelNode> inputs = Commons.transform(rel.getInputs(), input -> convert(input, traits));
+        List<RelNode> inputs = Commons.transform(union.getInputs(), input -> convert(input, traits));
+
+        RelNode res = new IgniteUnionAll(cluster, traits, inputs);
+
+        if (!union.all) {
+            final RelBuilder relBuilder = relBuilderFactory.create(union.getCluster(), null);
+
+            relBuilder
+                .push(res)
+                .aggregate(relBuilder.groupKey(ImmutableBitSet.range(union.getRowType().getFieldCount())));
+
+            res = convert(relBuilder.build(), union.getTraitSet());
+        }
+
+        call.transformTo(res);
+    }
 
-        PhysicalNode res = new IgniteUnionAll(cluster, traits, inputs);
+    /**
+     *
+     */
+    public interface Config extends RelRule.Config {
+        /** */
+        UnionConverterRule.Config DEFAULT = RelRule.Config.EMPTY
+            .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+            .withDescription("UnionConverterRule")
+            .as(UnionConverterRule.Config.class)
+            .withOperandFor(LogicalUnion.class);
 
-        if (!rel.all)
-            res = new IgniteHashAggregate(cluster, traits, res,
-                ImmutableBitSet.range(rel.getRowType().getFieldCount()), null, ImmutableList.of());
+        /** Defines an operand tree for the given classes. */
+        default UnionConverterRule.Config withOperandFor(Class<? extends LogicalUnion> union) {
+            return withOperandSupplier(
+                o0 -> o0.operand(union).anyInputs()
+            )
+                .as(UnionConverterRule.Config.class);
+        }
 
-        return res;
+        /** {@inheritDoc} */
+        @Override default UnionConverterRule toRule() {
+            return new UnionConverterRule(this);
+        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 6cd77f1..04094fd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,7 +49,6 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
@@ -417,42 +414,6 @@ public class TraitUtils {
     }
 
     /**
-     * Creates collations list with all permutation of specified keys.
-     *
-     * @param keys The keys to create collation from.
-     * @return New collation.
-     */
-    public static List<RelCollation> permute(List<Integer> keys) {
-        keys = new ArrayList<>(keys);
-
-        List<RelCollation> res = new ArrayList<>();
-
-        int[] indexes = new int[keys.size()];
-        Arrays.fill(indexes, 0);
-
-        res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
-
-        int i = 0;
-
-        while (i < keys.size()) {
-            if (indexes[i] < i) {
-                Collections.swap(keys, i % 2 == 0 ? 0 : indexes[i], i);
-
-                res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
-
-                indexes[i]++;
-                i = 0;
-            }
-            else {
-                indexes[i] = 0;
-                i++;
-            }
-        }
-
-        return res;
-    }
-
-    /**
      * @param elem Elem.
      */
     private static <T> List<T> singletonListFromNullable(@Nullable T elem) {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
index 5f99aa3..40752d6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregateDistinctPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregateDistinctPlannerTest.java
new file mode 100644
index 0000000..8bef6fb
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregateDistinctPlannerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class AggregateDistinctPlannerTest extends AbstractAggregatePlannerTest {
+    /** Algorithm. */
+    @Parameterized.Parameter
+    public AggregateAlgorithm algo;
+
+    /** */
+    @Parameterized.Parameters(name = "Algorithm = {0}")
+    public static List<Object[]> parameters() {
+        return Stream.of(AggregateAlgorithm.values()).map(a -> new Object[]{a}).collect(Collectors.toList());
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void mapReduceDistinctWithIndex() throws Exception {
+        TestTable tbl = createAffinityTable().addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT DISTINCT val0, val1 FROM test";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            algo.rulesToDisable
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+        IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+        Assert.assertTrue(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.isEmpty(rdcAgg.getAggregateCalls()));
+
+        Assert.assertTrue(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.isEmpty(mapAgg.getAggCallList()));
+
+        if (algo == AggregateAlgorithm.SORT)
+            assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+    }
+
+    /** */
+    enum AggregateAlgorithm {
+        /** */
+        SORT(
+            IgniteSingleSortAggregate.class,
+            IgniteMapSortAggregate.class,
+            IgniteReduceSortAggregate.class,
+            "HashSingleAggregateConverterRule",
+            "HashMapReduceAggregateConverterRule",
+            "SortSingleAggregateConverterRule"
+        ),
+
+        /**
+         *
+         */
+        HASH(
+            IgniteSingleHashAggregate.class,
+            IgniteMapHashAggregate.class,
+            IgniteReduceHashAggregate.class,
+            "SortSingleAggregateConverterRule",
+            "SortMapReduceAggregateConverterRule",
+            "HashSingleAggregateConverterRule"
+        );
+
+        /** */
+        public final Class<? extends IgniteSingleAggregateBase> single;
+
+        /** */
+        public final Class<? extends IgniteMapAggregateBase> map;
+
+        /** */
+        public final Class<? extends IgniteReduceAggregateBase> reduce;
+
+        /** */
+        public final String[] rulesToDisable;
+
+        /** */
+        AggregateAlgorithm(
+            Class<? extends IgniteSingleAggregateBase> single,
+            Class<? extends IgniteMapAggregateBase> map,
+            Class<? extends IgniteReduceAggregateBase> reduce,
+            String... rulesToDisable) {
+            this.single = single;
+            this.map = map;
+            this.reduce = reduce;
+            this.rulesToDisable = rulesToDisable;
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
index 3ba52ba..1d36838 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
@@ -26,25 +26,22 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.fun.SqlAvgAggFunction;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregateBase;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregateBase;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.internal.util.typedef.F;
 import org.hamcrest.core.IsInstanceOf;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -52,7 +49,6 @@ import org.junit.runners.Parameterized;
 /**
  *
  */
-@SuppressWarnings({"TypeMayBeWeakened"})
 @RunWith(Parameterized.class)
 public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
     /** Algorithm. */
@@ -81,12 +77,12 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
         IgniteRel phys = physicalPlan(
             sql,
             publicSchema,
-            algo.ruleToDisable
+            algo.rulesToDisable
         );
 
         checkSplitAndSerialization(phys, publicSchema);
 
-        IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+        IgniteSingleAggregateBase agg = findFirstNode(phys, byClass(algo.single));
 
         assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
 
@@ -104,8 +100,6 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
      */
     @Test
     public void singleWithIndex() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
         TestTable tbl = createBroadcastTable().addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_grp1");
 
         IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
@@ -117,12 +111,12 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
         IgniteRel phys = physicalPlan(
             sql,
             publicSchema,
-            algo.ruleToDisable
+            algo.rulesToDisable
         );
 
         checkSplitAndSerialization(phys, publicSchema);
 
-        IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+        IgniteSingleAggregateBase agg = findFirstNode(phys, byClass(algo.single));
 
         assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
 
@@ -151,20 +145,20 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
         IgniteRel phys = physicalPlan(
             sql,
             publicSchema,
-            algo.ruleToDisable
+            algo.rulesToDisable
         );
 
         checkSplitAndSerialization(phys, publicSchema);
 
-        IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+        IgniteMapAggregateBase mapAgg = findFirstNode(phys, byClass(algo.map));
         IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
 
-        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), rdcAgg);
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
         assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
 
         Assert.assertThat(
             "Invalid plan\n" + RelOptUtil.toString(phys),
-            F.first(rdcAgg.aggregateCalls()).getAggregation(),
+            F.first(rdcAgg.getAggregateCalls()).getAggregation(),
             IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
 
         Assert.assertThat(
@@ -176,89 +170,46 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
             assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
     }
 
-    /**
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    @Ignore("Single aggregates must be disabled by hint: https://issues.apache.org/jira/browse/IGNITE-14274")
-    public void mapReduceDistinctWithIndex() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-        TestTable tbl = createAffinityTable().addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
-
-        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
-        publicSchema.addTable("TEST", tbl);
-
-        String sql = "SELECT DISTINCT val0, val1 FROM test";
-
-        IgniteRel phys = physicalPlan(
-            sql,
-            publicSchema,
-            algo.ruleToDisable
-        );
-
-        checkSplitAndSerialization(phys, publicSchema);
-
-        IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
-        IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
-
-        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
-        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
-
-        Assert.assertTrue(
-            "Invalid plan\n" + RelOptUtil.toString(phys),
-            F.isEmpty(rdcAgg.aggregateCalls()));
-
-        Assert.assertTrue(
-            "Invalid plan\n" + RelOptUtil.toString(phys),
-            F.isEmpty(mapAgg.getAggCallList()));
-
-        if (algo == AggregateAlgorithm.SORT)
-            assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
-    }
-
     /** */
     enum AggregateAlgorithm {
         /** */
         SORT(
-            IgniteSortAggregate.class,
+            IgniteSingleSortAggregate.class,
             IgniteMapSortAggregate.class,
             IgniteReduceSortAggregate.class,
-            "HashAggregateConverterRule"
+            "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
         ),
 
         /** */
         HASH(
-            IgniteHashAggregate.class,
+            IgniteSingleHashAggregate.class,
             IgniteMapHashAggregate.class,
             IgniteReduceHashAggregate.class,
-            "SortAggregateConverterRule"
+            "SortSingleAggregateConverterRule", "SortMapReduceAggregateConverterRule"
         );
 
         /** */
-        public final Class<? extends IgniteAggregateBase> single;
+        public final Class<? extends IgniteSingleAggregateBase> single;
 
         /** */
-        public final Class<? extends IgniteAggregate> map;
+        public final Class<? extends IgniteMapAggregateBase> map;
 
         /** */
         public final Class<? extends IgniteReduceAggregateBase> reduce;
 
         /** */
-        public final String ruleToDisable;
+        public final String[] rulesToDisable;
 
         /** */
         AggregateAlgorithm(
-            Class<? extends IgniteAggregateBase> single,
-            Class<? extends IgniteAggregate> map,
+            Class<? extends IgniteSingleAggregateBase> single,
+            Class<? extends IgniteMapAggregateBase> map,
             Class<? extends IgniteReduceAggregateBase> reduce,
-            String ruleToDisable) {
+            String... rulesToDisable) {
             this.single = single;
             this.map = map;
             this.reduce = reduce;
-            this.ruleToDisable = ruleToDisable;
+            this.rulesToDisable = rulesToDisable;
         }
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java
index e7596a4..6cdbe2f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregatePlannerTest.java
@@ -27,9 +27,9 @@ import org.apache.calcite.sql.fun.SqlCountAggFunction;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -95,7 +95,7 @@ public class HashAggregatePlannerTest extends AbstractAggregatePlannerTest {
 
         Assert.assertThat(
             "Invalid plan\n" + RelOptUtil.toString(phys),
-            F.first(rdcAgg.aggregateCalls()).getAggregation(),
+            F.first(rdcAgg.getAggregateCalls()).getAggregation(),
             IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
 
         Assert.assertThat(
@@ -132,7 +132,7 @@ public class HashAggregatePlannerTest extends AbstractAggregatePlannerTest {
 
         Assert.assertThat(
             "Invalid plan\n" + RelOptUtil.toString(phys),
-            F.first(rdcAgg.aggregateCalls()).getAggregation(),
+            F.first(rdcAgg.getAggregateCalls()).getAggregation(),
             IsInstanceOf.instanceOf(SqlCountAggFunction.class));
 
         Assert.assertThat(
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 08a2595..42787033 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -339,294 +339,6 @@ public class PlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testUnion() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-        TestTable tbl1 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("NAME", f.createJavaType(String.class))
-                .add("SALARY", f.createJavaType(Double.class))
-                .build()) {
-
-            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
-                return ColocationGroup.forAssignments(Arrays.asList(
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2),
-                    select(nodes, 2, 0),
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2)
-                ));
-            }
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "Table1", "hash");
-            }
-        };
-
-        TestTable tbl2 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("NAME", f.createJavaType(String.class))
-                .add("SALARY", f.createJavaType(Double.class))
-                .build()) {
-
-            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
-                return ColocationGroup.forAssignments(Arrays.asList(
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2),
-                    select(nodes, 2, 0),
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2)
-                ));
-            }
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "Table2", "hash");
-            }
-        };
-
-        TestTable tbl3 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("NAME", f.createJavaType(String.class))
-                .add("SALARY", f.createJavaType(Double.class))
-                .build()) {
-
-            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
-                return ColocationGroup.forAssignments(Arrays.asList(
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2),
-                    select(nodes, 2, 0),
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2)
-                ));
-            }
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "Table3", "hash");
-            }
-        };
-
-        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
-        publicSchema.addTable("TABLE1", tbl1);
-        publicSchema.addTable("TABLE2", tbl2);
-        publicSchema.addTable("TABLE3", tbl3);
-
-        SchemaPlus schema = createRootSchema(false)
-            .add("PUBLIC", publicSchema);
-
-        String sql = "" +
-            "SELECT * FROM table1 " +
-            "UNION " +
-            "SELECT * FROM table2 " +
-            "UNION " +
-            "SELECT * FROM table3 ";
-
-        RelTraitDef<?>[] traitDefs = {
-            DistributionTraitDef.INSTANCE,
-            ConventionTraitDef.INSTANCE,
-            RelCollationTraitDef.INSTANCE,
-            RewindabilityTraitDef.INSTANCE,
-            CorrelationTraitDef.INSTANCE
-        };
-
-        PlanningContext ctx = PlanningContext.builder()
-            .localNodeId(F.first(nodes))
-            .originatingNodeId(F.first(nodes))
-            .parentContext(Contexts.empty())
-            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
-                .defaultSchema(schema)
-                .traitDefs(traitDefs)
-                .build())
-            .logger(log)
-            .query(sql)
-            .parameters(2)
-            .topologyVersion(AffinityTopologyVersion.NONE)
-            .build();
-
-        RelNode root;
-
-        try (IgnitePlanner planner = ctx.planner()) {
-            assertNotNull(planner);
-
-            String qry = ctx.query();
-
-            assertNotNull(qry);
-
-            // Parse
-            SqlNode sqlNode = planner.parse(qry);
-
-            // Validate
-            sqlNode = planner.validate(sqlNode);
-
-            // Convert to Relational operators graph
-            root = planner.convert(sqlNode);
-
-            // Transformation chain
-            root = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, root.getTraitSet(), root);
-
-            // Transformation chain
-            RelTraitSet desired = root.getCluster().traitSet()
-                .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            root = planner.transform(PlannerPhase.OPTIMIZATION, desired, root);
-        }
-
-        assertNotNull(root);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testUnionAll() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-        TestTable tbl1 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("NAME", f.createJavaType(String.class))
-                .add("SALARY", f.createJavaType(Double.class))
-                .build()) {
-
-            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
-                return ColocationGroup.forAssignments(Arrays.asList(
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2),
-                    select(nodes, 2, 0),
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2)
-                ));
-            }
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "Table1", "hash");
-            }
-        };
-
-        TestTable tbl2 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("NAME", f.createJavaType(String.class))
-                .add("SALARY", f.createJavaType(Double.class))
-                .build()) {
-
-            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
-                return ColocationGroup.forAssignments(Arrays.asList(
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2),
-                    select(nodes, 2, 0),
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2)
-                ));
-            }
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "Table2", "hash");
-            }
-        };
-
-        TestTable tbl3 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("NAME", f.createJavaType(String.class))
-                .add("SALARY", f.createJavaType(Double.class))
-                .build()) {
-
-            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
-                return ColocationGroup.forAssignments(Arrays.asList(
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2),
-                    select(nodes, 2, 0),
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2)
-                ));
-            }
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "Table3", "hash");
-            }
-        };
-
-        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
-        publicSchema.addTable("TABLE1", tbl1);
-        publicSchema.addTable("TABLE2", tbl2);
-        publicSchema.addTable("TABLE3", tbl3);
-
-        SchemaPlus schema = createRootSchema(false)
-            .add("PUBLIC", publicSchema);
-
-        String sql = "" +
-            "SELECT * FROM table1 " +
-            "UNION ALL " +
-            "SELECT * FROM table2 " +
-            "UNION ALL " +
-            "SELECT * FROM table3 ";
-
-        RelTraitDef<?>[] traitDefs = {
-            DistributionTraitDef.INSTANCE,
-            ConventionTraitDef.INSTANCE,
-            RelCollationTraitDef.INSTANCE,
-            RewindabilityTraitDef.INSTANCE,
-            CorrelationTraitDef.INSTANCE
-        };
-
-        PlanningContext ctx = PlanningContext.builder()
-            .localNodeId(F.first(nodes))
-            .originatingNodeId(F.first(nodes))
-            .parentContext(Contexts.empty())
-            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
-                .defaultSchema(schema)
-                .traitDefs(traitDefs)
-                .build())
-            .logger(log)
-            .query(sql)
-            .parameters(2)
-            .topologyVersion(AffinityTopologyVersion.NONE)
-            .build();
-
-        RelNode root;
-
-        try (IgnitePlanner planner = ctx.planner()) {
-            assertNotNull(planner);
-
-            String qry = ctx.query();
-
-            assertNotNull(qry);
-
-            // Parse
-            SqlNode sqlNode = planner.parse(qry);
-
-            // Validate
-            sqlNode = planner.validate(sqlNode);
-
-            // Convert to Relational operators graph
-            root = planner.convert(sqlNode);
-
-            // Transformation chain
-            root = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, root.getTraitSet(), root);
-
-            // Transformation chain
-            RelTraitSet desired = root.getCluster().traitSet()
-                .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            root = planner.transform(PlannerPhase.OPTIMIZATION, desired, root);
-        }
-
-        assertNotNull(root);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
     public void testHepPlaner() throws Exception {
         IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java
index d5bd8a6..88f9cdb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortAggregatePlannerTest.java
@@ -17,10 +17,24 @@
 
 package org.apache.ignite.internal.processors.query.calcite.planner;
 
+import java.util.Arrays;
+
 import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
@@ -46,10 +60,108 @@ public class SortAggregatePlannerTest extends AbstractAggregatePlannerTest {
             () -> physicalPlan(
                 sqlMin,
                 publicSchema,
-                "HashAggregateConverterRule"
+                "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
             ),
             RelOptPlanner.CannotPlanException.class,
             "There are not enough rules to produce a node with desired properties"
         );
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void collationPermuteSingle() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL0", f.createJavaType(Integer.class))
+                .add("VAL1", f.createJavaType(Integer.class))
+                .add("GRP0", f.createJavaType(Integer.class))
+                .add("GRP1", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        }
+            .addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_1");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT MIN(val0) FROM test GROUP BY grp1, grp0";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
+        );
+
+        IgniteSingleSortAggregate agg = findFirstNode(phys, byClass(IgniteSingleSortAggregate.class));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+        assertNull(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            findFirstNode(phys, byClass(IgniteSort.class))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void collationPermuteMapReduce() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL0", f.createJavaType(Integer.class))
+                .add("VAL1", f.createJavaType(Integer.class))
+                .add("GRP0", f.createJavaType(Integer.class))
+                .add("GRP1", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "test", "hash");
+            }
+        }
+            .addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_1");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT MIN(val0) FROM test GROUP BY grp1, grp0";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
+        );
+
+        IgniteReduceSortAggregate agg = findFirstNode(phys, byClass(IgniteReduceSortAggregate.class));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+        assertNull(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            findFirstNode(phys, byClass(IgniteSort.class))
+        );
+    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/UnionPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/UnionPlannerTest.java
new file mode 100644
index 0000000..8847904
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/UnionPlannerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+
+/**
+ *
+ */
+//@WithSystemProperty(key = "calcite.debug", value = "true")
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class UnionPlannerTest extends AbstractPlannerTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUnion() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl1 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("SALARY", f.createJavaType(Double.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Table1", "hash");
+            }
+        };
+
+        TestTable tbl2 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("SALARY", f.createJavaType(Double.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Table2", "hash");
+            }
+        };
+
+        TestTable tbl3 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("SALARY", f.createJavaType(Double.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Table3", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TABLE1", tbl1);
+        publicSchema.addTable("TABLE2", tbl2);
+        publicSchema.addTable("TABLE3", tbl3);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "" +
+            "SELECT * FROM table1 " +
+            "UNION " +
+            "SELECT * FROM table2 " +
+            "UNION " +
+            "SELECT * FROM table3 ";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema
+        );
+
+        System.out.println("+++ " + RelOptUtil.toString(phys));
+
+        assertNotNull(phys);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUnionAll() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl1 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("SALARY", f.createJavaType(Double.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Table1", "hash");
+            }
+        };
+
+        TestTable tbl2 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("SALARY", f.createJavaType(Double.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Table2", "hash");
+            }
+        };
+
+        TestTable tbl3 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("SALARY", f.createJavaType(Double.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Table3", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TABLE1", tbl1);
+        publicSchema.addTable("TABLE2", tbl2);
+        publicSchema.addTable("TABLE3", tbl3);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "" +
+            "SELECT * FROM table1 " +
+            "UNION ALL " +
+            "SELECT * FROM table2 " +
+            "UNION ALL " +
+            "SELECT * FROM table3 ";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema
+        );
+
+        System.out.println("+++ " + RelOptUtil.toString(phys));
+
+        assertNotNull(phys);
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index c6cb5ba..50c715f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testsuites;
 
+import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
@@ -38,6 +39,7 @@ import org.junit.runners.Suite;
     TableSpoolPlannerTest.class,
     IndexSpoolPlannerTest.class,
     AggregatePlannerTest.class,
+    AggregateDistinctPlannerTest.class,
     HashAggregatePlannerTest.class,
     SortAggregatePlannerTest.class,
     JoinColocationPlannerTest.class,