You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/04 10:20:52 UTC

[ignite] branch sql-calcite updated: IGNITE-13543 Calcite. Introduce sort-based aggregates

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new dcf6913  IGNITE-13543 Calcite. Introduce sort-based aggregates
dcf6913 is described below

commit dcf6913677abda06e4244a55e8f7cc9e218eb7c8
Author: tledkov <tl...@gridgain.com>
AuthorDate: Thu Mar 4 13:20:33 2021 +0300

    IGNITE-13543 Calcite. Introduce sort-based aggregates
---
 .../query/calcite/exec/LogicalRelImplementor.java  | 126 ++++-
 .../query/calcite/exec/exp/ExpressionFactory.java  |   8 +-
 .../calcite/exec/exp/ExpressionFactoryImpl.java    |   9 +-
 .../calcite/exec/exp/agg/AccumulatorsFactory.java  |  16 +-
 .../query/calcite/exec/exp/agg/AggregateType.java} |  28 +-
 .../{AggregateNode.java => HashAggregateNode.java} |  18 +-
 .../query/calcite/exec/rel/SortAggregateNode.java  | 304 ++++++++++++
 .../query/calcite/externalize/RelJson.java         |   1 +
 .../calcite/metadata/IgniteMdDistinctRowCount.java |  52 +++
 .../query/calcite/metadata/IgniteMetadata.java     |   1 +
 .../query/calcite/metadata/cost/IgniteCost.java    |   3 +
 .../processors/query/calcite/prepare/Cloner.java   |  29 +-
 .../query/calcite/prepare/IgniteRelShuttle.java    |  31 +-
 .../query/calcite/prepare/PlannerPhase.java        |   6 +-
 .../query/calcite/rel/IgniteAggregate.java         | 274 +++--------
 ...niteAggregate.java => IgniteAggregateBase.java} | 122 ++---
 .../query/calcite/rel/IgniteHashAggregate.java     |  98 ++++
 ...pAggregate.java => IgniteMapHashAggregate.java} |  66 +--
 .../query/calcite/rel/IgniteMapSortAggregate.java  | 137 ++++++
 ...gregate.java => IgniteReduceAggregateBase.java} |  54 +--
 .../calcite/rel/IgniteReduceHashAggregate.java     | 104 +++++
 .../calcite/rel/IgniteReduceSortAggregate.java     | 123 +++++
 .../query/calcite/rel/IgniteRelVisitor.java        |  21 +-
 .../query/calcite/rel/IgniteSortAggregate.java     | 151 ++++++
 ...erRule.java => HashAggregateConverterRule.java} |  12 +-
 ...erRule.java => SortAggregateConverterRule.java} |  36 +-
 .../query/calcite/rule/UnionConverterRule.java     |   5 +-
 .../processors/query/calcite/trait/TraitUtils.java |  40 ++
 .../query/calcite/AggregatesIntegrationTest.java   |   4 +
 .../calcite/SortAggregateIntegrationTest.java      | 154 ++++++
 .../calcite/exec/rel/AbstractExecutionTest.java    |  47 +-
 .../query/calcite/exec/rel/BaseAggregateTest.java  | 448 ++++++++++++++++++
 .../query/calcite/exec/rel/ExecutionTest.java      | 511 +-------------------
 .../exec/rel/HashAggregateExecutionTest.java       | 138 ++++++
 .../rel/HashAggregateSingleGroupExecutionTest.java | 518 +++++++++++++++++++++
 .../exec/rel/SortAggregateExecutionTest.java       | 146 ++++++
 .../query/calcite/planner/AbstractPlannerTest.java |   5 +
 .../calcite/planner/AggregatePlannerTest.java      | 397 ++++++++++++++++
 .../query/calcite/planner/HashAggregateTest.java   | 105 +++++
 .../query/calcite/planner/PlannerTest.java         |  93 ----
 .../ignite/testsuites/ExecutionTestSuite.java      |   6 +
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |   6 +-
 .../apache/ignite/testsuites/PlannerTestSuite.java |   4 +-
 43 files changed, 3393 insertions(+), 1064 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index e9a1ec3..6e628e7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -36,7 +36,9 @@ import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
@@ -54,19 +56,22 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNo
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.UnionAllNode;
 import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -433,8 +438,8 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteAggregate rel) {
-        AggregateNode.AggregateType type = AggregateNode.AggregateType.SINGLE;
+    @Override public Node<Row> visit(IgniteHashAggregate rel) {
+        AggregateType type = AggregateType.SINGLE;
 
         RelDataType rowType = rel.getRowType();
         RelDataType inputType = rel.getInput().getRowType();
@@ -443,7 +448,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
             type, rel.getAggCallList(), inputType);
         RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
-        AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
+        HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -453,8 +458,8 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteMapAggregate rel) {
-        AggregateNode.AggregateType type = AggregateNode.AggregateType.MAP;
+    @Override public Node<Row> visit(IgniteMapHashAggregate rel) {
+        AggregateType type = AggregateType.MAP;
 
         RelDataType rowType = rel.getRowType();
         RelDataType inputType = rel.getInput().getRowType();
@@ -463,7 +468,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
             type, rel.getAggCallList(), inputType);
         RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
-        AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
+        HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.getGroupSets(), accFactory, rowFactory);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -473,8 +478,8 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteReduceAggregate rel) {
-        AggregateNode.AggregateType type = AggregateNode.AggregateType.REDUCE;
+    @Override public Node<Row> visit(IgniteReduceHashAggregate rel) {
+        AggregateType type = AggregateType.REDUCE;
 
         RelDataType rowType = rel.getRowType();
 
@@ -482,7 +487,102 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
             type, rel.aggregateCalls(), null);
         RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
-        AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, rel.groupSets(), accFactory, rowFactory);
+        HashAggregateNode<Row> node = new HashAggregateNode<>(ctx, rowType, type, rel.groupSets(), accFactory, rowFactory);
+
+        Node<Row> input = visit(rel.getInput());
+
+        node.register(input);
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteSortAggregate rel) {
+        AggregateType type = AggregateType.SINGLE;
+
+        RelDataType rowType = rel.getRowType();
+        RelDataType inputType = rel.getInput().getRowType();
+
+        Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
+            type,
+            rel.getAggCallList(),
+            inputType
+        );
+
+        RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+        SortAggregateNode<Row> node = new SortAggregateNode<>(
+            ctx,
+            rowType,
+            type,
+            rel.getGroupSet(),
+            accFactory,
+            rowFactory,
+            expressionFactory.comparator(rel.collation())
+        );
+
+        Node<Row> input = visit(rel.getInput());
+
+        node.register(input);
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteMapSortAggregate rel) {
+        AggregateType type = AggregateType.MAP;
+
+        RelDataType rowType = rel.getRowType();
+        RelDataType inputType = rel.getInput().getRowType();
+
+        Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
+            type,
+            rel.getAggCallList(),
+            inputType
+        );
+
+        RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+        SortAggregateNode<Row> node = new SortAggregateNode<>(
+            ctx,
+            rowType,
+            type,
+            rel.getGroupSet(),
+            accFactory,
+            rowFactory,
+            expressionFactory.comparator(rel.collation())
+        );
+
+        Node<Row> input = visit(rel.getInput());
+
+        node.register(input);
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteReduceSortAggregate rel) {
+        AggregateType type = AggregateType.REDUCE;
+
+        RelDataType rowType = rel.getRowType();
+
+        Supplier<List<AccumulatorWrapper<Row>>> accFactory = expressionFactory.accumulatorsFactory(
+            type,
+            rel.aggregateCalls(),
+            null
+        );
+
+        RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+        SortAggregateNode<Row> node = new SortAggregateNode<>(
+            ctx,
+            rowType,
+            type,
+            rel.groupSet(),
+            accFactory,
+            rowFactory,
+            expressionFactory.comparator(rel.collation())
+            );
 
         Node<Row> input = visit(rel.getInput());
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
index 7361ca2..7255955 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
@@ -31,14 +31,18 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 
 /**
  * Expression factory.
  */
 public interface ExpressionFactory<Row> {
     /** */
-    Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(AggregateNode.AggregateType type, List<AggregateCall> calls, RelDataType rowType);
+    Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(
+        AggregateType type,
+        List<AggregateCall> calls,
+        RelDataType rowType
+    );
 
     /**
      * Creates a comparator for given data type and collations. Mainly used for sorted exchange.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
index c1a883f..ef10723 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
@@ -61,7 +61,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorsFactory;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
@@ -102,8 +102,11 @@ public class ExpressionFactoryImpl<Row> implements ExpressionFactory<Row> {
     }
 
     /** {@inheritDoc} */
-    @Override public Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(AggregateNode.AggregateType type,
-        List<AggregateCall> calls, RelDataType rowType) {
+    @Override public Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(
+        AggregateType type,
+        List<AggregateCall> calls,
+        RelDataType rowType
+    ) {
         return new AccumulatorsFactory<>(ctx, type, calls, rowType);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
index 7cd8ece..afc803c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
@@ -50,7 +50,6 @@ import org.apache.calcite.util.Pair;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -146,8 +145,12 @@ public class AccumulatorsFactory<Row> implements Supplier<List<AccumulatorWrappe
     private final List<WrapperPrototype> prototypes;
 
     /** */
-    public AccumulatorsFactory(ExecutionContext<Row> ctx, AggregateType type,
-        List<AggregateCall> aggCalls, RelDataType inputRowType) {
+    public AccumulatorsFactory(
+        ExecutionContext<Row> ctx,
+        AggregateType type,
+        List<AggregateCall> aggCalls,
+        RelDataType inputRowType
+    ) {
         this.ctx = ctx;
         this.type = type;
         this.inputRowType = inputRowType;
@@ -265,9 +268,12 @@ public class AccumulatorsFactory<Row> implements Supplier<List<AccumulatorWrappe
         private final RowHandler<Row> handler;
 
         /** */
-        AccumulatorWrapperImpl(Accumulator accumulator, AggregateCall call,
+        AccumulatorWrapperImpl(
+            Accumulator accumulator,
+            AggregateCall call,
             Function<Object[], Object[]> inAdapter,
-            Function<Object, Object> outAdapter) {
+            Function<Object, Object> outAdapter
+        ) {
             this.accumulator = accumulator;
             this.inAdapter = inAdapter;
             this.outAdapter = outAdapter;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AggregateType.java
similarity index 53%
copy from modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AggregateType.java
index ad329f1..c743e2a 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AggregateType.java
@@ -15,24 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
-
-import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;
 
 /**
- * Calcite tests.
+ *
  */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
-    PlannerTest.class,
-    CorrelatedNestedLoopJoinPlannerTest.class,
-    TableSpoolPlannerTest.class,
-    IndexSpoolPlannerTest.class
-})
-public class PlannerTestSuite {
+public enum AggregateType {
+    /** Map phase. */
+    MAP,
+
+    /** Reduce phase. */
+    REDUCE,
+
+    /** Single phase aggregate. */
+    SINGLE
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
index 08743da..1f4728e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
@@ -44,7 +45,7 @@ import static org.apache.ignite.internal.processors.query.calcite.util.Commons.n
 /**
  *
  */
-public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
+public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
     /** */
     private final AggregateType type;
 
@@ -72,7 +73,7 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<
     /**
      * @param ctx Execution context.
      */
-    public AggregateNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, List<ImmutableBitSet> grpSets,
+    public HashAggregateNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, List<ImmutableBitSet> grpSets,
         Supplier<List<AccumulatorWrapper<Row>>> accFactory, RowFactory<Row> rowFactory) {
         super(ctx, rowType);
 
@@ -215,19 +216,6 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<
     }
 
     /** */
-    @SuppressWarnings("PublicInnerClass")
-    public enum AggregateType {
-        /** Map phase. */
-        MAP,
-
-        /** Reduce phase. */
-        REDUCE,
-
-        /** Single phase aggregate. */
-        SINGLE
-    }
-
-    /** */
     private class Grouping {
         /** */
         private final byte grpId;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
new file mode 100644
index 0000000..af63e34
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class SortAggregateNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Supplier<List<AccumulatorWrapper<Row>>> accFactory;
+
+    /** */
+    private final RowFactory<Row> rowFactory;
+
+    /** */
+    private final ImmutableBitSet grpSet;
+
+    /** */
+    private final Comparator<Row> comp;
+
+    /** */
+    private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /** */
+    private Row prevRow;
+
+    /** */
+    private Group grp;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** */
+    private int cmpRes;
+
+    /**
+     * @param ctx Execution context.
+     */
+    public SortAggregateNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        AggregateType type,
+        ImmutableBitSet grpSet,
+        Supplier<List<AccumulatorWrapper<Row>>> accFactory,
+        RowFactory<Row> rowFactory,
+        Comparator<Row> comp
+    ) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.accFactory = accFactory;
+        this.rowFactory = rowFactory;
+        this.grpSet = grpSet;
+        this.comp = comp;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0 && requested == 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!outBuf.isEmpty())
+            doPush();
+
+        if (waiting == 0) {
+            waiting = IN_BUFFER_SIZE;
+
+            source().request(IN_BUFFER_SIZE);
+        }
+        else if (waiting < 0)
+            downstream().end();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void push(Row row) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        if (grp != null) {
+            int cmp = comp.compare(row, prevRow);
+
+            if (cmp == 0)
+                grp.add(row);
+            else {
+                if (cmpRes == 0)
+                    cmpRes = cmp;
+                else
+                    assert Integer.signum(cmp) == Integer.signum(cmpRes) : "Input not sorted";
+
+                outBuf.add(grp.row());
+
+                grp = newGroup(row);
+
+                doPush();
+            }
+        }
+        else
+            grp = newGroup(row);
+
+        prevRow = row;
+
+        if (waiting == 0 && requested > 0) {
+            waiting = IN_BUFFER_SIZE;
+
+            context().execute(() -> source().request(IN_BUFFER_SIZE), this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void end() throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting = -1;
+
+        if (grp != null) {
+            outBuf.add(grp.row());
+
+            doPush();
+        }
+
+        if (requested > 0)
+            downstream().end();
+
+        grp = null;
+        prevRow = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        grp = null;
+        prevRow = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    /** */
+    private Group newGroup(Row r) {
+        final Object[] grpKeys = new Object[grpSet.cardinality()];
+        List<Integer> fldIdxs = grpSet.asList();
+
+        final RowHandler<Row> rowHandler = rowFactory.handler();
+
+        for (int i = 0; i < grpKeys.length; ++i)
+            grpKeys[i] = rowHandler.get(fldIdxs.get(i), r);
+
+        Group grp = new Group(grpKeys);
+
+        grp.add(r);
+
+        return grp;
+    }
+
+    /** */
+    private void doPush() throws Exception {
+        while (requested > 0 && !outBuf.isEmpty()) {
+            requested--;
+
+            downstream().push(outBuf.poll());
+        }
+    }
+
+    /** */
+    private class Group {
+        /** */
+        private final List<AccumulatorWrapper<Row>> accumWrps;
+
+        /** */
+        private final RowHandler<Row> handler;
+
+        /** */
+        private final Object[] grpKeys;
+
+        /** */
+        private Group(Object[] grpKeys) {
+            this.grpKeys = grpKeys;
+
+            accumWrps = accFactory.get();
+
+            handler = context().rowHandler();
+        }
+
+        /** */
+        private void add(Row row) {
+            if (type == AggregateType.REDUCE)
+                addOnReducer(row);
+            else
+                addOnMapper(row);
+        }
+
+        /** */
+        private Row row() {
+            if (type == AggregateType.MAP)
+                return rowOnMapper();
+            else
+                return rowOnReducer();
+        }
+
+        /** */
+        private void addOnMapper(Row row) {
+            for (AccumulatorWrapper<Row> wrapper : accumWrps)
+                wrapper.add(row);
+        }
+
+        /** */
+        private void addOnReducer(Row row) {
+            List<Accumulator> accums = (List<Accumulator>)handler.get(handler.columnCount(row) - 1, row);
+
+            for (int i = 0; i < accums.size(); i++) {
+                AccumulatorWrapper<Row> wrapper = accumWrps.get(i);
+
+                Accumulator accum = accums.get(i);
+
+                wrapper.apply(accum);
+            }
+        }
+
+        /** */
+        private Row rowOnMapper() {
+            Object[] fields = new Object[grpSet.cardinality() + 1];
+
+            int i = 0;
+
+            for (Object grpKey : grpKeys)
+                fields[i++] = grpKey;
+
+            // Last column is the accumulators collection.
+            fields[i] = Commons.transform(accumWrps, AccumulatorWrapper::accumulator);
+
+            return rowFactory.create(fields);
+        }
+
+        /** */
+        private Row rowOnReducer() {
+            Object[] fields = new Object[grpSet.cardinality() + accumWrps.size()];
+
+            int i = 0;
+
+            for (Object grpKey : grpKeys)
+                fields[i++] = grpKey;
+
+            for (AccumulatorWrapper<Row> accWrp : accumWrps)
+                fields[i++] = accWrp.end();
+
+            return rowFactory.create(fields);
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index cf01709..b91e3e0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -656,6 +656,7 @@ class RelJson {
         map.put("type", toJson(node.getType()));
         map.put("distinct", node.isDistinct());
         map.put("operands", node.getArgList());
+        map.put("filter", node.filterArg);
         map.put("name", node.getName());
         return map;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistinctRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistinctRowCount.java
new file mode 100644
index 0000000..15a668b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistinctRowCount.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/** */
+@SuppressWarnings("unused") // actually all methods are used by runtime generated classes
+public class IgniteMdDistinctRowCount extends RelMdDistinctRowCount {
+    public static final RelMetadataProvider SOURCE =
+        ReflectiveRelMetadataProvider.reflectiveSource(
+            BuiltInMethod.DISTINCT_ROW_COUNT.method, new IgniteMdDistinctRowCount());
+
+    /** {@inheritDoc} */
+    @Override public Double getDistinctRowCount(
+        RelSubset rel,
+        RelMetadataQuery mq,
+        ImmutableBitSet groupKey,
+        RexNode predicate
+    ) {
+        if (groupKey.cardinality() == 0)
+            return 1d;
+
+        double rowCount = mq.getRowCount(rel);
+
+        rowCount *= 1.0 - Math.pow(.5, groupKey.cardinality());
+
+        return rowCount;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index a8a941e..b279c05 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -47,6 +47,7 @@ public class IgniteMetadata {
                 IgniteMdPredicates.SOURCE,
                 IgniteMdCollation.SOURCE,
                 IgniteMdSelectivity.SOURCE,
+                IgniteMdDistinctRowCount.SOURCE,
 
                 // Basic providers
                 DefaultRelMetadataProvider.INSTANCE));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
index a78ff09..60bd64e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
@@ -37,6 +37,9 @@ public class IgniteCost implements RelOptCost {
     /** Cost of a comparison of one row. */
     public static final double ROW_COMPARISON_COST = 3;
 
+    /** Memory cost of a aggregate call. */
+    public static final double AGG_CALL_MEM_COST = 5;
+
     /**
      * With broadcast distribution each row will be sent to the each distination node,
      * thus the total bytes amount will be multiplies of the destination nodes count.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 34e1ee7..2069945 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -19,23 +19,26 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
@@ -187,17 +190,31 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteAggregate rel) {
+    @Override public IgniteRel visit(IgniteHashAggregate rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteMapAggregate rel) {
+    @Override public IgniteRel visit(IgniteMapHashAggregate rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteReduceAggregate rel) {
+    @Override public IgniteRel visit(IgniteReduceHashAggregate rel) {
+        return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSortAggregate rel) {
+        return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMapSortAggregate rel) {
+        return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    @Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 870962c..7e232f29 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -18,23 +18,27 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
@@ -86,17 +90,32 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteAggregate rel) {
+    @Override public IgniteRel visit(IgniteHashAggregate rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMapHashAggregate rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReduceHashAggregate rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSortAggregate rel) {
         return processNode(rel);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteMapAggregate rel) {
+    @Override public IgniteRel visit(IgniteMapSortAggregate rel) {
         return processNode(rel);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteReduceAggregate rel) {
+    @Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
         return processNode(rel);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 768fc24..58d8667 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -39,14 +39,15 @@ import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.tools.Program;
 import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.RuleSets;
-import org.apache.ignite.internal.processors.query.calcite.rule.AggregateConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.CorrelatedNestedLoopJoinRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMergeRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.HashAggregateConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.TableModifyConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.UnionConverterRule;
@@ -159,7 +160,8 @@ public enum PlannerPhase {
                     ValuesConverterRule.INSTANCE,
                     LogicalScanConverterRule.INDEX_SCAN,
                     LogicalScanConverterRule.TABLE_SCAN,
-                    AggregateConverterRule.INSTANCE,
+                    HashAggregateConverterRule.INSTANCE,
+                    SortAggregateConverterRule.INSTANCE,
                     MergeJoinConverterRule.INSTANCE,
                     NestedLoopJoinConverterRule.INSTANCE,
                     ProjectConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
index e951b34..debbdcc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
@@ -17,258 +17,104 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
 /**
  *
  */
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
+public abstract class IgniteAggregate extends Aggregate implements IgniteRel {
     /** {@inheritDoc} */
-    public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    protected IgniteAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
         super(cluster, traitSet, ImmutableList.of(), input, groupSet, groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
-    public IgniteAggregate(RelInput input) {
+    protected IgniteAggregate(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
 
-    /** {@inheritDoc} */
-    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
-        //    and all of input distribution keys are parts of aggregation group and vice versa.
-        // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                return Pair.of(nodeTraits, ImmutableList.of(in.replace(distribution)));
-
-            case RANDOM_DISTRIBUTED:
-                if (!groupSet.isEmpty() && isSimple(this)) {
-                    IgniteDistribution outDistr = hash(range(0, groupSet.cardinality()));
-                    IgniteDistribution inDistr = hash(groupSet.asList());
-
-                    return Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in.replace(inDistr)));
-                }
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                ImmutableIntList keys = distribution.getKeys();
-
-                if (isSimple(this) && groupSet.cardinality() == keys.size()) {
-                    Mappings.TargetMapping mapping = Commons.inverseMapping(
-                        groupSet, getInput().getRowType().getFieldCount());
-
-                    List<Integer> srcKeys = new ArrayList<>(keys.size());
-
-                    for (int key : keys) {
-                        int src = mapping.getSourceOpt(key);
-
-                        if (src == -1)
-                            break;
-
-                        srcKeys.add(src);
-                    }
-
-                    if (srcKeys.size() == keys.size())
-                        return Pair.of(nodeTraits, ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        return Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single())));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Aggregate is rewindable if its input is rewindable.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
-            ? RewindabilityTrait.ONE_WAY
-            : TraitUtils.rewindability(in);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), ImmutableList.of(in.replace(rewindability))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
-        //    and all of input distribution keys are parts aggregation group.
-        // 3) Map-reduce aggregation is possible in case it's a simple aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        IgniteDistribution distribution = TraitUtils.distribution(in);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                res.add(Pair.of(nodeTraits.replace(distribution), ImmutableList.of(in)));
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                if (isSimple(this)) {
-                    ImmutableIntList keys = distribution.getKeys();
-
-                    if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = Commons.inverseMapping(
-                            groupSet, getInput().getRowType().getFieldCount());
-
-                        IgniteDistribution outDistr = distribution.apply(mapping);
-
-                        if (outDistr.getType() == HASH_DISTRIBUTED)
-                            res.add(Pair.of(nodeTraits.replace(outDistr), ImmutableList.of(in)));
-                    }
-                }
-
-                break;
-
-            case RANDOM_DISTRIBUTED:
-                // Map-reduce aggregates
-                if (isSimple(this)) {
-                    res.add(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(random()))));
-                    res.add(Pair.of(nodeTraits.replace(broadcast()), ImmutableList.of(in.replace(random()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()), ImmutableList.of(in.replace(single()))));
-    }
+    /** */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        Double groupsCnt = mq.getDistinctRowCount(getInput(), groupSet, null);
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
+        if (groupsCnt != null)
+            return groupsCnt;
 
-        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
+        // Estimation of the groups count is not available.
+        // Use heuristic estimation for result rows count.
+        return super.estimateRowCount(mq);
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
-    @Override public @NotNull RelNode createNode(RelTraitSet outTraits, List<RelTraitSet> inTraits) {
-        RelTraitSet in = inTraits.get(0);
-
-        if (!isMapReduce(outTraits, in))
-            return copy(outTraits, ImmutableList.of(convert(getInput(), in)));
-
-        if (U.assertionsEnabled()) {
-            ImmutableList<RelTrait> diff = in.difference(outTraits);
-
-            assert diff.size() == 1 && F.first(diff) == TraitUtils.distribution(outTraits);
+    /** */
+    public double estimateMemoryForGroup(RelMetadataQuery mq) {
+        double mem = groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;
+
+        if (!aggCalls.isEmpty()) {
+            double grps = estimateRowCount(mq);
+            double rows = input.estimateRowCount(mq);
+
+            for (AggregateCall aggCall : aggCalls) {
+                if (aggCall.isDistinct())
+                    mem += IgniteCost.AGG_CALL_MEM_COST * rows / grps;
+                else
+                    mem += IgniteCost.AGG_CALL_MEM_COST;
+            }
         }
 
-        RelNode map = new IgniteMapAggregate(getCluster(), in, convert(getInput(), in), groupSet, groupSets, aggCalls);
-        return new IgniteReduceAggregate(getCluster(), outTraits, convert(map, outTraits), groupSet, groupSets, aggCalls, getRowType());
+        return mem;
     }
 
     /** */
-    private boolean isMapReduce(RelTraitSet out, RelTraitSet in) {
-        return TraitUtils.distribution(out).satisfies(single())
-            && TraitUtils.distribution(in).satisfies(random());
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteAggregate(cluster, getTraitSet(), sole(inputs),
-            getGroupSet(), getGroupSets(), getAggCallList());
+    public RelOptCost computeSelfCostHash(RelOptPlanner planner, RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double inRows = mq.getRowCount(getInput());
+        double groups = estimateRowCount(mq);
+
+        return costFactory.makeCost(
+            inRows,
+            inRows * IgniteCost.ROW_PASS_THROUGH_COST,
+            0,
+            groups * estimateMemoryForGroup(mq),
+            0
+        );
     }
 
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rows = mq.getRowCount(getInput());
-
-        // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
-        // currently it's OK to have such a dummy cost because there is no other options
-        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
+    /** */
+    public RelOptCost computeSelfCostSort(RelOptPlanner planner, RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double inRows = mq.getRowCount(getInput());
+
+        return costFactory.makeCost(
+            inRows,
+            inRows * IgniteCost.ROW_PASS_THROUGH_COST,
+            0,
+            estimateMemoryForGroup(mq),
+            0
+        );
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
similarity index 73%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
index e951b34..2380615 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
@@ -22,22 +22,17 @@ import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -59,29 +54,29 @@ import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUti
 /**
  *
  */
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
+public abstract class IgniteAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
     /** {@inheritDoc} */
-    public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        super(cluster, traitSet, ImmutableList.of(), input, groupSet, groupSets, aggCalls);
+    protected IgniteAggregateBase(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
+        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
-    public IgniteAggregate(RelInput input) {
+    protected IgniteAggregateBase(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
 
     /** {@inheritDoc} */
-    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
         // Distribution propagation is based on next rules:
         // 1) Any aggregation is possible on single or broadcast distribution.
         // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
@@ -141,14 +136,10 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
         // Aggregate is rewindable if its input is rewindable.
 
         RelTraitSet in = inputTraits.get(0);
@@ -161,7 +152,10 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
         // Distribution propagation is based on next rules:
         // 1) Any aggregation is possible on single or broadcast distribution.
         // 2) hash-distributed aggregation is possible in case it's a simple aggregate having hash distributed input
@@ -220,16 +214,10 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
         return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
             inTraits));
     }
@@ -247,28 +235,48 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
             assert diff.size() == 1 && F.first(diff) == TraitUtils.distribution(outTraits);
         }
 
-        RelNode map = new IgniteMapAggregate(getCluster(), in, convert(getInput(), in), groupSet, groupSets, aggCalls);
-        return new IgniteReduceAggregate(getCluster(), outTraits, convert(map, outTraits), groupSet, groupSets, aggCalls, getRowType());
+        RelNode map = createMapAggregate(
+            getCluster(),
+            in,
+            convert(getInput(), in),
+            groupSet,
+            groupSets,
+            aggCalls);
+
+        return createReduceAggregate(
+            getCluster(),
+            outTraits,
+            convert(map, outTraits),
+            groupSet,
+            groupSets,
+            aggCalls,
+            getRowType());
     }
 
     /** */
+    protected abstract RelNode createReduceAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        ImmutableList<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls,
+        RelDataType rowType
+    );
+
+    /** */
+    protected abstract RelNode createMapAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        ImmutableList<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    );
+
+    /** */
     private boolean isMapReduce(RelTraitSet out, RelTraitSet in) {
         return TraitUtils.distribution(out).satisfies(single())
             && TraitUtils.distribution(in).satisfies(random());
     }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteAggregate(cluster, getTraitSet(), sole(inputs),
-            getGroupSet(), getGroupSets(), getAggCallList());
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rows = mq.getRowCount(getInput());
-
-        // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
-        // currently it's OK to have such a dummy cost because there is no other options
-        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
new file mode 100644
index 0000000..a0f4613
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashAggregate.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+
+/**
+ *
+ */
+public class IgniteHashAggregate extends IgniteAggregateBase {
+    /** {@inheritDoc} */
+    public IgniteHashAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    public IgniteHashAggregate(RelInput input) {
+        super(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+        return new IgniteHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteHashAggregate(cluster, getTraitSet(), sole(inputs),
+            getGroupSet(), getGroupSets(), getAggCallList());
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        // Since it's a hash aggregate it erases collation.
+        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        // Since it's a hash aggregate it erases collation.
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+        return new IgniteMapHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
+        RelDataType rowType) {
+        return new IgniteReduceHashAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, getRowType());
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return computeSelfCostHash(planner, mq);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
similarity index 60%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
index 808301c..49bdf9e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapHashAggregate.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -32,30 +33,39 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
 /**
  *
  */
-public class IgniteMapAggregate extends Aggregate implements IgniteRel {
-    public IgniteMapAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+public class IgniteMapHashAggregate extends IgniteAggregate {
+    /** */
+    public IgniteMapHashAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
         super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
     }
 
     /** */
-    public IgniteMapAggregate(RelInput input) {
-        super(changeTraits(input, IgniteConvention.INSTANCE));
+    public IgniteMapHashAggregate(RelInput input) {
+        super(input);
     }
 
     /** {@inheritDoc} */
     @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-        return new IgniteMapAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+        return new IgniteMapHashAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteMapHashAggregate(cluster, getTraitSet(), sole(inputs),
+            getGroupSet(), getGroupSets(), getAggCallList());
     }
 
     /** {@inheritDoc} */
@@ -68,37 +78,6 @@ public class IgniteMapAggregate extends Aggregate implements IgniteRel {
         return rowType(Commons.typeFactory(getCluster()));
     }
 
-    /**
-     * @return Map aggregate relational node distribution calculated on the basis of its input and groupingSets.
-     * <b>Note</b> that the method returns {@code null} in case the given input cannot be processed in map-reduce
-     * style by an aggregate.
-     */
-    public static IgniteDistribution distribution(IgniteDistribution inputDistr, ImmutableBitSet groupSet,
-        List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-
-        switch (inputDistr.getType()) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                return inputDistr;
-
-            case RANDOM_DISTRIBUTED:
-            case HASH_DISTRIBUTED:
-                return random(); // its OK to just erase distribution here
-
-            default:
-                throw new AssertionError("Unexpected distribution type.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rows = mq.getRowCount(getInput());
-
-        // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
-        // currently it's OK to have such a dummy cost because there is no other options
-        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
-    }
-
     /** */
     public static RelDataType rowType(RelDataTypeFactory typeFactory) {
         assert typeFactory instanceof IgniteTypeFactory;
@@ -113,8 +92,7 @@ public class IgniteMapAggregate extends Aggregate implements IgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteMapAggregate(cluster, getTraitSet(), sole(inputs),
-            getGroupSet(), getGroupSets(), getAggCallList());
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return computeSelfCostHash(planner, mq);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
new file mode 100644
index 0000000..171b747
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapSortAggregate.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ *
+ */
+public class IgniteMapSortAggregate extends IgniteAggregate {
+    /** Collation. */
+    private final RelCollation collation;
+
+    /** */
+    public IgniteMapSortAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls,
+        RelCollation collation
+    ) {
+        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+
+        assert Objects.nonNull(collation);
+        assert !collation.isDefault();
+
+        this.collation = collation;
+    }
+
+    /** */
+    public IgniteMapSortAggregate(RelInput input) {
+        super(changeTraits(input, IgniteConvention.INSTANCE));
+
+        collation = input.getCollation();
+
+        assert Objects.nonNull(collation);
+        assert !collation.isDefault();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Aggregate copy(
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls) {
+        return new IgniteMapSortAggregate(getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteMapSortAggregate(
+            cluster,
+            getTraitSet(),
+            sole(inputs),
+            getGroupSet(),
+            getGroupSets(),
+            getAggCallList(),
+            collation
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw).item("collation", collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected RelDataType deriveRowType() {
+        RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
+
+        RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+        groupSet.forEach(fieldIdx -> {
+            RelDataTypeField fld = input.getRowType().getFieldList().get(fieldIdx);
+
+            builder.add(fld);
+        });
+
+        builder.add("AGG_DATA", typeFactory.createArrayType(typeFactory.createJavaType(Accumulator.class), -1));
+
+        return builder.build();
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return computeSelfCostSort(planner, mq);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelCollation collation() {
+        return collation;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
similarity index 61%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
index 1e4f88b..cfab999 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregateBase.java
@@ -18,12 +18,10 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
@@ -31,34 +29,37 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.core.Aggregate.Group;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
  */
-public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
+public abstract class IgniteReduceAggregateBase extends SingleRel implements IgniteRel {
     /** */
-    private final ImmutableBitSet groupSet;
+    protected final ImmutableBitSet groupSet;
 
     /** */
-    private final List<ImmutableBitSet> groupSets;
+    protected final List<ImmutableBitSet> groupSets;
 
     /** */
-    private final List<AggregateCall> aggCalls;
+    protected final List<AggregateCall> aggCalls;
 
     /** */
-    public IgniteReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, RelDataType rowType) {
+    protected IgniteReduceAggregateBase(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls,
+        RelDataType rowType
+    ) {
         super(cluster, traits, input);
 
         assert rowType != null;
-        assert RelOptUtil.areRowTypesEqual(input.getRowType(),
-            IgniteMapAggregate.rowType(Commons.typeFactory(cluster)), true);
         this.groupSet = groupSet;
         if (groupSets == null)
             groupSets = ImmutableList.of(groupSet);
@@ -68,7 +69,7 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
     }
 
     /** */
-    public IgniteReduceAggregate(RelInput input) {
+    protected IgniteReduceAggregateBase(RelInput input) {
         this(
             input.getCluster(),
             input.getTraitSet().replace(IgniteConvention.INSTANCE),
@@ -85,16 +86,6 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        return new IgniteReduceAggregate(getCluster(), traitSet, sole(inputs), groupSet, groupSets, aggCalls, rowType);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
     @Override public RelWriter explainTerms(RelWriter pw) {
         super.explainTerms(pw)
             .itemIf("rowType", rowType, pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES)
@@ -110,15 +101,6 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
         return pw;
     }
 
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rows = mq.getRowCount(getInput());
-
-        // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
-        // currently it's OK to have such a dummy cost because there is no other options
-        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
-    }
-
     /** */
     public ImmutableBitSet groupSet() {
         return groupSet;
@@ -133,10 +115,4 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
     public List<AggregateCall> aggregateCalls() {
         return aggCalls;
     }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteReduceAggregate(cluster, getTraitSet(), sole(inputs),
-            groupSet, groupSets, aggCalls, rowType);
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
new file mode 100644
index 0000000..dcda3ac
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceHashAggregate.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ *
+ */
+public class IgniteReduceHashAggregate extends IgniteReduceAggregateBase {
+    /** */
+    public IgniteReduceHashAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls,
+        RelDataType rowType
+    ) {
+        super(cluster, traits, input, groupSet, groupSets, aggCalls, rowType);
+
+        assert RelOptUtil.areRowTypesEqual(input.getRowType(),
+            IgniteMapHashAggregate.rowType(Commons.typeFactory(cluster)), true);
+    }
+
+    /** */
+    public IgniteReduceHashAggregate(RelInput input) {
+        super(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteReduceHashAggregate(getCluster(), traitSet, sole(inputs), groupSet, groupSets, aggCalls, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteReduceHashAggregate(cluster, getTraitSet(), sole(inputs),
+            groupSet, groupSets, aggCalls, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double rows = mq.getRowCount(getInput());
+
+        double mem = 0d;
+        if (aggCalls.isEmpty())
+            mem = groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;
+        else {
+            for (AggregateCall aggCall : aggCalls) {
+                if (aggCall.isDistinct())
+                    mem += IgniteCost.AGG_CALL_MEM_COST * rows;
+                else
+                    mem += IgniteCost.AGG_CALL_MEM_COST;
+            }
+        }
+
+        return costFactory.makeCost(
+            rows,
+            rows * IgniteCost.ROW_PASS_THROUGH_COST,
+            0,
+            mem,
+            0
+        );
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
new file mode 100644
index 0000000..bb71bf1
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceSortAggregate.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+
+/**
+ *
+ */
+public class IgniteReduceSortAggregate extends IgniteReduceAggregateBase {
+    /** Collation. */
+    private final RelCollation collation;
+
+    /** */
+    public IgniteReduceSortAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls,
+        RelDataType rowType,
+        RelCollation collation
+    ) {
+        super(cluster, traits, input, groupSet, groupSets, aggCalls, rowType);
+
+        assert Objects.nonNull(collation);
+        assert !collation.isDefault();
+
+        this.collation = collation;
+    }
+
+    /** */
+    public IgniteReduceSortAggregate(RelInput input) {
+        super(input);
+        collation = input.getCollation();
+
+        assert Objects.nonNull(collation);
+        assert !collation.isDefault();
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteReduceSortAggregate(
+            getCluster(),
+            traitSet,
+            sole(inputs),
+            groupSet,
+            groupSets,
+            aggCalls,
+            rowType,
+            collation
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteReduceSortAggregate(
+            cluster,
+            getTraitSet(),
+            sole(inputs),
+            groupSet,
+            groupSets,
+            aggCalls,
+            rowType,
+            collation
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw).item("collation", collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double rows = mq.getRowCount(getInput());
+
+        return costFactory.makeCost(
+            rows,
+            rows * IgniteCost.ROW_PASS_THROUGH_COST,
+            0
+        );
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 2db2cc1..8c620a6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -79,17 +79,32 @@ public interface IgniteRelVisitor<T> {
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
-    T visit(IgniteAggregate rel);
+    T visit(IgniteHashAggregate rel);
 
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
-    T visit(IgniteMapAggregate rel);
+    T visit(IgniteMapHashAggregate rel);
 
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
-    T visit(IgniteReduceAggregate rel);
+    T visit(IgniteReduceHashAggregate rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
+    T visit(IgniteSortAggregate rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
+    T visit(IgniteMapSortAggregate rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
+    T visit(IgniteReduceSortAggregate rel);
 
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
new file mode 100644
index 0000000..0a2e3e5
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+
+/**
+ *
+ */
+public class IgniteSortAggregate extends IgniteAggregateBase {
+    /** Collation. */
+    private final RelCollation collation;
+
+    /** {@inheritDoc} */
+    public IgniteSortAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
+        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+
+        assert !TraitUtils.collation(traitSet).isDefault();
+        assert !groupSet.isEmpty() && groupSets.size() == 1;
+
+        collation = TraitUtils.collation(traitSet);
+    }
+
+    /** {@inheritDoc} */
+    public IgniteSortAggregate(RelInput input) {
+        super(input);
+
+        collation = input.getCollation();
+
+        assert Objects.nonNull(collation);
+        assert !collation.isDefault();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+        return new IgniteSortAggregate(getCluster(), traitSet.replace(collation), input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteSortAggregate(cluster, getTraitSet(), sole(inputs),
+            getGroupSet(), getGroupSets(), getAggCallList());
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw).item("collation", collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
+        RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+    ) {
+        RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(groupSet.asList()));
+
+        return Pair.of(nodeTraits.replace(collation),
+            ImmutableList.of(inputTraits.get(0).replace(collation)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
+        RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
+    ) {
+        RelCollation inputCollation = TraitUtils.collation(inputTraits.get(0));
+
+        List<RelCollation> satisfiedCollations = TraitUtils.permute(groupSet.asList()).stream()
+            .filter(col -> inputCollation.satisfies(col))
+            .collect(Collectors.toList());
+
+        if (satisfiedCollations.isEmpty())
+            return ImmutableList.of();
+
+        return satisfiedCollations.stream()
+            .map(col -> Pair.of(nodeTraits.replace(col), inputTraits))
+            .collect(Collectors.toList());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected RelNode createMapAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+        return new IgniteMapSortAggregate(getCluster(), traits, input, groupSet, groupSets, aggCalls, collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected RelNode createReduceAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+        ImmutableBitSet groupSet, ImmutableList<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls,
+        RelDataType rowType) {
+        return new IgniteReduceSortAggregate(getCluster(), traits, input, groupSet, groupSets,
+            aggCalls, getRowType(), collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelCollation collation() {
+        assert collation.equals(super.collation());
+
+        return collation;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return computeSelfCostSort(planner, mq);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
similarity index 83%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
index 0955517..b1cf2e2a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
@@ -25,19 +25,19 @@ import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 
 /**
  *
  */
-public class AggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+public class HashAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
     /** */
-    public static final RelOptRule INSTANCE = new AggregateConverterRule();
+    public static final RelOptRule INSTANCE = new HashAggregateConverterRule();
 
     /** */
-    public AggregateConverterRule() {
-        super(LogicalAggregate.class, "AggregateConverterRule");
+    public HashAggregateConverterRule() {
+        super(LogicalAggregate.class, "HashAggregateConverterRule");
     }
 
     /** {@inheritDoc} */
@@ -48,7 +48,7 @@ public class AggregateConverterRule extends AbstractIgniteConverterRule<LogicalA
         RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
         RelNode input = convert(rel.getInput(), inTrait);
 
-        return new IgniteAggregate(cluster, outTrait, input,
+        return new IgniteHashAggregate(cluster, outTrait, input,
             rel.getGroupSet(), rel.getGroupSets(), rel.getAggCallList());
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
similarity index 62%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
index 0955517..a4dc10a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AggregateConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortAggregateConverterRule.java
@@ -22,33 +22,47 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.util.typedef.F;
 
-/**
- *
- */
-public class AggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
+/** */
+public class SortAggregateConverterRule extends AbstractIgniteConverterRule<LogicalAggregate> {
     /** */
-    public static final RelOptRule INSTANCE = new AggregateConverterRule();
+    public static final RelOptRule INSTANCE = new SortAggregateConverterRule();
 
     /** */
-    public AggregateConverterRule() {
-        super(LogicalAggregate.class, "AggregateConverterRule");
+    public SortAggregateConverterRule() {
+        super(LogicalAggregate.class, "SortAggregateConverterRule");
     }
 
     /** {@inheritDoc} */
     @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
         LogicalAggregate rel) {
+        // Applicable only for GROUP BY
+        if (F.isEmpty(rel.getGroupSet()) || rel.getGroupSets().size() > 1)
+            return null;
+
         RelOptCluster cluster = rel.getCluster();
         RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
         RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        RelNode input = convert(rel.getInput(), inTrait);
+        RelNode input = rel.getInput();
+
+        RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(rel.getGroupSet().asList()));
 
-        return new IgniteAggregate(cluster, outTrait, input,
-            rel.getGroupSet(), rel.getGroupSets(), rel.getAggCallList());
+        return new IgniteSortAggregate(
+            cluster,
+            outTrait.replace(collation),
+            convert(input, inTrait.replace(collation)),
+            rel.getGroupSet(),
+            rel.getGroupSets(),
+            rel.getAggCallList()
+        );
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
index 9183c62..cd85348 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rule;
 
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -28,8 +29,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalUnion;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
@@ -54,7 +55,7 @@ public class UnionConverterRule extends AbstractIgniteConverterRule<LogicalUnion
         PhysicalNode res = new IgniteUnionAll(cluster, traits, inputs);
 
         if (!rel.all)
-            res = new IgniteAggregate(cluster, traits, res,
+            res = new IgniteHashAggregate(cluster, traits, res,
                 ImmutableBitSet.range(rel.getRowType().getFieldCount()), null, ImmutableList.of());
 
         return res;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 3856c83..b128566 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -45,6 +47,7 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
@@ -406,6 +409,43 @@ public class TraitUtils {
     }
 
     /**
+     * Creates collations list with all permutation of specified keys.
+     *
+     * @param keys The keys to create collation from.
+     * @return New collation.
+     */
+    public static List<RelCollation> permute(List<Integer> keys) {
+        keys = new ArrayList<>(keys);
+
+        List<RelCollation> res = new ArrayList<>();
+
+        int[] indexes = new int[keys.size()];
+        Arrays.fill(indexes, 0);
+
+        res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
+
+        int i = 0;
+
+        while (i < keys.size()) {
+            if (indexes[i] < i) {
+                Collections.swap(keys, i % 2 == 0 ? 0 : indexes[i], i);
+
+                res.add(RelCollations.of(ImmutableIntList.copyOf(keys)));
+
+                indexes[i]++;
+                i = 0;
+            }
+            else {
+                indexes[i] = 0;
+                i++;
+            }
+        }
+
+        return res;
+    }
+
+
+    /**
      * @param elem Elem.
      */
     private static <T> List<T> singletonListFromNullable(@Nullable T elem) {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
index 439c06e..5a0dfe8 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
@@ -42,6 +42,10 @@ public class AggregatesIntegrationTest extends AbstractBasicIntegrationTest {
         assertQuery("select count(*) from person where salary > 10").returns(2L).check();
         assertQuery("select count(1) from person where salary > 10").returns(2L).check();
 
+        assertQuery("select count(name) filter (where salary > 10) from person").returns(1L).check();
+        assertQuery("select count(*) filter (where salary > 10) from person").returns(2L).check();
+        assertQuery("select count(1) filter (where salary > 10) from person").returns(2L).check();
+
         assertQuery("select salary, count(name) from person group by salary order by salary")
             .returns(10d, 3L)
             .returns(15d, 1L)
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
new file mode 100644
index 0000000..718b2e2
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Sort aggregate integration test.
+ */
+public class SortAggregateIntegrationTest extends GridCommonAbstractTest {
+    /** */
+    public static final int ROWS = 103;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        fillCache(grid(0).cache("TEST"), ROWS);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid(0).cache("TEST").clear();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        QueryEntity part = new QueryEntity()
+            .setTableName("TEST")
+            .setKeyType(Integer.class.getName())
+            .setValueType(TestVal.class.getName())
+            .setKeyFieldName("ID")
+            .addQueryField("ID", Integer.class.getName(), null)
+            .addQueryField("GRP0", Integer.class.getName(), null)
+            .addQueryField("GRP1", Integer.class.getName(), null)
+            .addQueryField("VAL0", Integer.class.getName(), null)
+            .addQueryField("VAL1", Integer.class.getName(), null)
+            .setIndexes(Collections.singletonList(new QueryIndex(Arrays.asList("GRP0", "GRP1"), QueryIndexType.SORTED)));
+
+        return super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(
+                new CacheConfiguration<>(part.getTableName())
+                    .setAffinity(new RendezvousAffinityFunction(false, 8))
+                    .setCacheMode(CacheMode.PARTITIONED)
+                    .setQueryEntities(singletonList(part))
+                    .setSqlSchema("PUBLIC")
+            );
+    }
+
+    /** */
+    @Test
+    public void mapReduceAggregate() {
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> cursors = engine.query(
+            null,
+            "PUBLIC",
+            "SELECT /*+ DISABLE_RULE('HashAggregateConverterRule') */" +
+                "SUM(val0), SUM(val1), grp0 FROM TEST " +
+                "GROUP BY grp0 " +
+                "HAVING SUM(val1) > 10",
+            X.EMPTY_OBJECT_ARRAY
+        );
+
+        List<List<?>> res = cursors.get(0).getAll();
+
+        assertEquals(ROWS / 10, res.size());
+
+        res.forEach(r -> {
+            Integer s0 = (Integer)r.get(0);
+            Integer s1 = (Integer)r.get(1);
+
+            assertEquals(s0 * 2, (int)s1);
+        });
+    }
+
+    /**
+     * @param c Cache.
+     * @param rows Rows count.
+     */
+    private void fillCache(IgniteCache c, int rows) throws InterruptedException {
+        c.clear();
+
+        for (int i = 0; i < rows; ++i)
+            c.put(i, new TestVal(i));
+
+        awaitPartitionMapExchange();
+    }
+
+    /** */
+    public static class TestVal {
+        /** */
+        int grp0;
+
+        /** */
+        int grp1;
+
+        /** */
+        int val0;
+
+        /** */
+        int val1;
+
+        /** */
+        TestVal(int k) {
+            grp0 = k / 10;
+            grp1 = k / 100;
+
+            val0 = 1;
+            val1 = 2;
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 02bc083..a490e6c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -292,6 +292,11 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
     }
 
     /** */
+    protected Object[] row(Object... fields) {
+        return fields;
+    }
+
+    /** */
     private static class TestMessageServiceImpl extends MessageServiceImpl {
         /** */
         private final TestIoManager mgr;
@@ -338,24 +343,38 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
 
         /** */
         TestTable(int rowsCnt, RelDataType rowType) {
+            this(
+                rowsCnt,
+                rowType,
+                rowType.getFieldList().stream()
+                    .map((Function<RelDataTypeField, Function<Integer, Object>>)(t) -> {
+                        switch (t.getType().getSqlTypeName().getFamily()) {
+                            case NUMERIC:
+                                return TestTable::intField;
+
+                            case CHARACTER:
+                                return TestTable::stringField;
+
+                            default:
+                                assert false : "Not supported type for test: " + t;
+                                return null;
+                        }
+                    })
+                    .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()])
+            );
+        }
+
+        /** */
+        TestTable(int rowsCnt, RelDataType rowType, Function<Integer, Object>... fieldCreators) {
             this.rowsCnt = rowsCnt;
             this.rowType = rowType;
+            this.fieldCreators = fieldCreators;
+        }
 
-            fieldCreators = rowType.getFieldList().stream()
-                .map((Function<RelDataTypeField, Function<Integer, Object>>) (t) -> {
-                    switch (t.getType().getSqlTypeName().getFamily()) {
-                        case NUMERIC:
-                            return TestTable::intField;
-
-                        case CHARACTER:
-                            return TestTable::stringField;
 
-                        default:
-                            assert false : "Not supported type for test: " + t;
-                            return null;
-                    }
-                })
-                .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()]);
+        /** */
+        private static Object field(Integer rowNum) {
+            return "val_" + rowNum;
         }
 
         /** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
new file mode 100644
index 0000000..4ef09e6
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public abstract class BaseAggregateTest extends AbstractExecutionTest {
+    /** Last parameter number. */
+    protected static final int TEST_AGG_PARAM_NUM = LAST_PARAM_NUM + 1;
+
+    /** */
+    @Parameterized.Parameter(TEST_AGG_PARAM_NUM)
+    public TestAggregateType testAgg;
+
+    /** */
+    @Parameterized.Parameters(name = PARAMS_STRING + ", type={" + TEST_AGG_PARAM_NUM + "}")
+    public static List<Object[]> data() {
+        List<Object[]> extraParams = new ArrayList<>();
+
+        ImmutableList<Object[]> newParams = ImmutableList.of(
+            new Object[] {TestAggregateType.SINGLE},
+            new Object[] {TestAggregateType.MAP_REDUCE}
+        );
+
+        for (Object[] newParam : newParams) {
+            for (Object[] inheritedParam : AbstractExecutionTest.parameters()) {
+                Object[] both = Stream.concat(Arrays.stream(inheritedParam), Arrays.stream(newParam))
+                    .toArray(Object[]::new);
+
+                extraParams.add(both);
+            }
+        }
+
+        return extraParams;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Before
+    @Override public void setup() throws Exception {
+        nodesCnt = 1;
+        super.setup();
+    }
+
+    /** */
+    @Test
+    public void count() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row(0, 200),
+            row(1, 300),
+            row(1, 1400),
+            row(0, 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.COUNT,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+        RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+        SingleNode<Object[]> aggChain = createAggregateNodesChain(
+            ctx,
+            grpSets,
+            call,
+            rowType,
+            aggRowType,
+            rowFactory(),
+            scan
+        );
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+        root.register(aggChain);
+
+        assertTrue(root.hasNext());
+
+        Assert.assertArrayEquals(row(0, 2), root.next());
+        Assert.assertArrayEquals(row(1, 2), root.next());
+
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void min() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row(0, 200),
+            row(1, 300),
+            row(1, 1400),
+            row(0, 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.MIN,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+        RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+        SingleNode<Object[]> aggChain = createAggregateNodesChain(
+            ctx,
+            grpSets,
+            call,
+            rowType,
+            aggRowType,
+            rowFactory(),
+            scan
+        );
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+        root.register(aggChain);
+
+        assertTrue(root.hasNext());
+
+        Assert.assertArrayEquals(row(0, 200), root.next());
+        Assert.assertArrayEquals(row(1, 300), root.next());
+
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void max() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row(0, 200),
+            row(1, 300),
+            row(1, 1400),
+            row(0, 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.MAX,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+        RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+        SingleNode<Object[]> aggChain = createAggregateNodesChain(
+            ctx,
+            grpSets,
+            call,
+            rowType,
+            aggRowType,
+            rowFactory(),
+            scan
+        );
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+        root.register(aggChain);
+
+        assertTrue(root.hasNext());
+
+        Assert.assertArrayEquals(row(0, 1000), root.next());
+        Assert.assertArrayEquals(row(1, 1400), root.next());
+
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void avg() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row(0, 200),
+            row(1, 300),
+            row(1, 1300),
+            row(0, 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.AVG,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+        RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+        SingleNode<Object[]> aggChain = createAggregateNodesChain(
+            ctx,
+            grpSets,
+            call,
+            rowType,
+            aggRowType,
+            rowFactory(),
+            scan
+        );
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+        root.register(aggChain);
+
+        assertTrue(root.hasNext());
+
+        Assert.assertArrayEquals(row(0, 600), root.next());
+        Assert.assertArrayEquals(row(1, 800), root.next());
+
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void sumOnDifferentRowsCount() throws IgniteCheckedException {
+        int bufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+        int[] grpsCount = {1, bufSize / 2, bufSize, bufSize + 1, bufSize * 4};
+        int[] rowsInGroups = {1, 5, bufSize};
+
+        for (int grps : grpsCount) {
+            for (int rowsInGroup : rowsInGroups) {
+                log.info("Check: [grps=" + grps + ", rowsInGroup=" + rowsInGroup + ']');
+
+                ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+                IgniteTypeFactory tf = ctx.getTypeFactory();
+                RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+
+                ScanNode<Object[]> scan = new ScanNode<>(
+                    ctx,
+                    rowType,
+                    new TestTable(
+                        grps * rowsInGroup,
+                        rowType,
+                        (r) -> r / rowsInGroup,
+                        (r) -> r % rowsInGroup
+                    )
+                );
+
+                AggregateCall call = AggregateCall.create(
+                    SqlStdOperatorTable.SUM,
+                    false,
+                    false,
+                    false,
+                    ImmutableIntList.of(1),
+                    -1,
+                    RelCollations.EMPTY,
+                    tf.createJavaType(int.class),
+                    null);
+
+                ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(0));
+
+                RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+                SingleNode<Object[]> aggChain = createAggregateNodesChain(
+                    ctx,
+                    grpSets,
+                    call,
+                    rowType,
+                    aggRowType,
+                    rowFactory(),
+                    scan
+                );
+
+                RootNode<Object[]> root = new RootNode<>(ctx, aggRowType);
+                root.register(aggChain);
+
+                Set<Integer> grpId = IntStream.range(0, grps).boxed().collect(Collectors.toSet());
+
+                while (root.hasNext()) {
+                    Object[] row = root.next();
+
+                    grpId.remove(row[0]);
+
+                    assertEquals((rowsInGroup - 1) * rowsInGroup / 2, row[1]);
+                }
+
+                assertTrue(grpId.isEmpty());
+            }
+        }
+    }
+
+
+    /** */
+    private SingleNode<Object[]> createAggregateNodesChain(
+        ExecutionContext<Object[]> ctx,
+        ImmutableList<ImmutableBitSet> grpSets,
+        AggregateCall aggCall,
+        RelDataType inRowType,
+        RelDataType aggRowType,
+        RowHandler.RowFactory<Object[]> rowFactory,
+        ScanNode<Object[]> scan
+    ) {
+        switch (testAgg) {
+            case SINGLE:
+                return createSingleAggregateNodesChain(ctx, grpSets, aggCall, inRowType, aggRowType, rowFactory, scan);
+
+            case MAP_REDUCE:
+                return createMapReduceAggregateNodesChain(ctx, grpSets, aggCall, inRowType, aggRowType, rowFactory, scan);
+
+            default:
+                assert false;
+
+                return null;
+        }
+    }
+
+    /** */
+    protected abstract SingleNode<Object[]> createSingleAggregateNodesChain(
+        ExecutionContext<Object[]> ctx,
+        ImmutableList<ImmutableBitSet> grpSets,
+        AggregateCall aggCall,
+        RelDataType inRowType,
+        RelDataType aggRowType,
+        RowHandler.RowFactory<Object[]> rowFactory,
+        ScanNode<Object[]> scan
+    );
+
+    /** */
+    protected abstract SingleNode<Object[]> createMapReduceAggregateNodesChain(
+        ExecutionContext<Object[]> ctx,
+        ImmutableList<ImmutableBitSet> grpSets,
+        AggregateCall call,
+        RelDataType inRowType,
+        RelDataType aggRowType,
+        RowHandler.RowFactory<Object[]> rowFactory,
+        ScanNode<Object[]> scan
+    );
+
+    /** */
+    protected Supplier<List<AccumulatorWrapper<Object[]>>> accFactory(
+        ExecutionContext<Object[]> ctx,
+        AggregateCall call,
+        AggregateType type,
+        RelDataType rowType
+    ) {
+        return ctx.expressionFactory().accumulatorsFactory(type, F.asList(call), rowType);
+    }
+
+    /** */
+    protected RowHandler.RowFactory<Object[]> rowFactory() {
+        return new RowHandler.RowFactory<Object[]>() {
+            /** */
+            @Override public RowHandler<Object[]> handler() {
+                return ArrayRowHandler.INSTANCE;
+            }
+
+            /** */
+            @Override public Object[] create() {
+                throw new AssertionError();
+            }
+
+            /** */
+            @Override public Object[] create(Object... fields) {
+                return fields;
+            }
+        };
+    }
+
+    /** */
+    enum TestAggregateType {
+        /** */
+        SINGLE,
+
+        /** */
+        MAP_REDUCE
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 4ba60cb..bf2cff0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -21,25 +21,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.Supplier;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -58,9 +46,6 @@ import static org.apache.calcite.rel.core.JoinRelType.INNER;
 import static org.apache.calcite.rel.core.JoinRelType.LEFT;
 import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
 import static org.apache.calcite.rel.core.JoinRelType.SEMI;
-import static org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType.MAP;
-import static org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType.REDUCE;
-import static org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNode.AggregateType.SINGLE;
 
 /**
  *
@@ -468,468 +453,6 @@ public class ExecutionTest extends AbstractExecutionTest {
      *
      */
     @Test
-    public void testAggregateMapReduceAvg() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.AVG,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(double.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType mapType = IgniteMapAggregate.rowType(tf);
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
-        map.register(scan);
-
-        RelDataType reduceType = TypeUtils.createRowType(tf, double.class);
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
-        reduce.register(map);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
-        root.register(reduce);
-
-        assertTrue(root.hasNext());
-        assertEquals(725d, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateMapReduceSum() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.SUM,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType mapType = IgniteMapAggregate.rowType(tf);
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
-        map.register(scan);
-
-        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
-        reduce.register(map);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
-        root.register(reduce);
-
-        assertTrue(root.hasNext());
-        assertEquals(2900, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateMapReduceMin() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.MIN,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType mapType = IgniteMapAggregate.rowType(tf);
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
-        map.register(scan);
-
-        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
-        reduce.register(map);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
-        root.register(reduce);
-
-        assertTrue(root.hasNext());
-        assertEquals(200, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateMapReduceMax() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.MAX,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType mapType = IgniteMapAggregate.rowType(tf);
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
-        map.register(scan);
-
-        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
-        reduce.register(map);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
-        root.register(reduce);
-
-        assertTrue(root.hasNext());
-        assertEquals(1400, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateMapReduceCount() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.COUNT,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType mapType = IgniteMapAggregate.rowType(tf);
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
-        map.register(scan);
-
-        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
-        reduce.register(map);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
-        root.register(reduce);
-
-        assertTrue(root.hasNext());
-        assertEquals(4, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateAvg() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.AVG,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(double.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
-        agg.register(scan);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
-        root.register(agg);
-
-        assertTrue(root.hasNext());
-        assertEquals(725d, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateSum() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.SUM,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
-        agg.register(scan);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
-        root.register(agg);
-
-        assertTrue(root.hasNext());
-        assertEquals(2900, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateMin() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.MIN,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
-        agg.register(scan);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
-        root.register(agg);
-
-        assertTrue(root.hasNext());
-        assertEquals(200, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateMax() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.MAX,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(1),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
-        agg.register(scan);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
-        root.register(agg);
-
-        assertTrue(root.hasNext());
-        assertEquals(1400, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateCount() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 200),
-            row("Roman", 300),
-            row("Ivan", 1400),
-            row("Alexey", 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.COUNT,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
-
-        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
-        agg.register(scan);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
-        root.register(agg);
-
-        assertTrue(root.hasNext());
-        assertEquals(4, root.next()[0]);
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void testAggregateCountByGroup() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class, int.class);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 0, 200),
-            row("Roman", 1, 300),
-            row("Ivan", 1, 1400),
-            row("Alexey", 0, 1000)
-        ));
-
-        AggregateCall call = AggregateCall.create(
-            SqlStdOperatorTable.COUNT,
-            false,
-            false,
-            false,
-            ImmutableIntList.of(),
-            -1,
-            RelCollations.EMPTY,
-            tf.createJavaType(int.class),
-            null);
-
-        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of(1));
-
-        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
-        agg.register(scan);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
-        root.register(agg);
-
-        assertTrue(root.hasNext());
-        // TODO needs a sort, relying on an order in a hash table looks strange
-        Assert.assertArrayEquals(row(1, 2), root.next());
-        Assert.assertArrayEquals(row(0, 2), root.next());
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     *
-     */
-    @Test
     public void testCorrelatedNestedLoopJoin() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
         IgniteTypeFactory tf = ctx.getTypeFactory();
@@ -1049,7 +572,7 @@ public class ExecutionTest extends AbstractExecutionTest {
 
     /**
      * Test verifies that an AssertionError thrown from an execution node
-     * proprely handled by a task executor.
+     * properly handled by a task executor.
      */
     @Test
     @SuppressWarnings("ThrowableNotThrown")
@@ -1084,41 +607,11 @@ public class ExecutionTest extends AbstractExecutionTest {
     /**
      *
      */
-    private Object[] row(Object... fields) {
+    protected Object[] row(Object... fields) {
         return fields;
     }
 
     /**
-     *
-     */
-    private Supplier<List<AccumulatorWrapper<Object[]>>> accFactory(ExecutionContext<Object[]> ctx, AggregateCall call,
-        AggregateNode.AggregateType type, RelDataType rowType) {
-        return ctx.expressionFactory().accumulatorsFactory(type, F.asList(call), rowType);
-    }
-
-    /**
-     *
-     */
-    private RowFactory<Object[]> rowFactory() {
-        return new RowFactory<Object[]>() {
-            /** */
-            @Override public RowHandler<Object[]> handler() {
-                return ArrayRowHandler.INSTANCE;
-            }
-
-            /** */
-            @Override public Object[] create() {
-                throw new AssertionError();
-            }
-
-            /** */
-            @Override public Object[] create(Object... fields) {
-                return fields;
-            }
-        };
-    }
-
-    /**
      * Node that always throws assertion error except for {@link #close()}
      * and {@link #onRegister(Downstream)} methods.
      */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateExecutionTest.java
new file mode 100644
index 0000000..a475e00
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateExecutionTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Comparator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class HashAggregateExecutionTest extends BaseAggregateTest {
+    /** {@inheritDoc} */
+    @Override protected SingleNode<Object[]> createSingleAggregateNodesChain(
+        ExecutionContext<Object[]> ctx,
+        ImmutableList<ImmutableBitSet> grpSets,
+        AggregateCall call,
+        RelDataType inRowType,
+        RelDataType aggRowType,
+        RowHandler.RowFactory<Object[]> rowFactory,
+        ScanNode<Object[]> scan
+    ) {
+        assert grpSets.size() == 1 : "Test checks only simple GROUP BY";
+
+        HashAggregateNode<Object[]> agg = new HashAggregateNode<>(
+            ctx,
+            aggRowType,
+            SINGLE,
+            grpSets,
+            accFactory(ctx, call, SINGLE, inRowType),
+            rowFactory
+        );
+
+        agg.register(scan);
+
+        // Collation of the first fields emulates planner behavior:
+        // The group's keys placed on the begin of the output row.
+        RelCollation collation = RelCollations.of(
+            ImmutableIntList.copyOf(
+                IntStream.range(0, F.first(grpSets).cardinality()).boxed().collect(Collectors.toList())
+            )
+        );
+
+        Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+        // Create sort node on the top to check sorted results
+        SortNode<Object[]> sort = new SortNode<>(ctx, inRowType, cmp);
+
+        sort.register(agg);
+
+        return sort;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected SingleNode<Object[]> createMapReduceAggregateNodesChain(
+        ExecutionContext<Object[]> ctx,
+        ImmutableList<ImmutableBitSet> grpSets,
+        AggregateCall call,
+        RelDataType inRowType,
+        RelDataType aggRowType,
+        RowHandler.RowFactory<Object[]> rowFactory,
+        ScanNode<Object[]> scan
+    ) {
+        assert grpSets.size() == 1 : "Test checks only simple GROUP BY";
+
+        HashAggregateNode<Object[]> aggMap = new HashAggregateNode<>(
+            ctx,
+            aggRowType,
+            MAP,
+            grpSets,
+            accFactory(ctx, call, MAP, inRowType),
+            rowFactory
+        );
+
+        aggMap.register(scan);
+
+        HashAggregateNode<Object[]> aggRdc = new HashAggregateNode<>(
+            ctx,
+            aggRowType,
+            REDUCE,
+            grpSets,
+            accFactory(ctx, call, REDUCE, aggRowType),
+            rowFactory
+        );
+
+        aggRdc.register(aggMap);
+
+        // Collation of the first fields emulates planner behavior:
+        // The group's keys placed on the begin of the output row.
+        RelCollation collation = RelCollations.of(
+            ImmutableIntList.copyOf(
+                IntStream.range(0, F.first(grpSets).cardinality()).boxed().collect(Collectors.toList())
+            )
+        );
+
+        Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+        // Create sort node on the top to check sorted results
+        SortNode<Object[]> sort = new SortNode<>(ctx, aggRowType, cmp);
+
+        sort.register(aggRdc);
+
+        return sort;
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
new file mode 100644
index 0000000..5f99aa3
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Before
+    @Override public void setup() throws Exception {
+        nodesCnt = 1;
+        super.setup();
+    }
+
+    /** */
+    @Test
+    public void mapReduceAvg() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.AVG,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(double.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+        HashAggregateNode<Object[]> map = new HashAggregateNode<>(
+            ctx,
+            mapType,
+            MAP,
+            grpSets,
+            accFactory(ctx, call, MAP, rowType),
+            rowFactory()
+        );
+
+        map.register(scan);
+
+        RelDataType reduceType = TypeUtils.createRowType(tf, double.class);
+        HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(
+            ctx,
+            reduceType,
+            REDUCE,
+            grpSets,
+            accFactory(ctx, call, REDUCE, null),
+            rowFactory()
+        );
+
+        reduce.register(map);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+        root.register(reduce);
+
+        assertTrue(root.hasNext());
+        assertEquals(725d, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void mapReduceSum() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.SUM,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+        HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+        map.register(scan);
+
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+        reduce.register(map);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+        root.register(reduce);
+
+        assertTrue(root.hasNext());
+        assertEquals(2900, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void mapReduceMin() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.MIN,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+        HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+        map.register(scan);
+
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+        reduce.register(map);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+        root.register(reduce);
+
+        assertTrue(root.hasNext());
+        assertEquals(200, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void mapReduceMax() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.MAX,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+        HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+        map.register(scan);
+
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+        reduce.register(map);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+        root.register(reduce);
+
+        assertTrue(root.hasNext());
+        assertEquals(1400, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void mapReduceCount() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.COUNT,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapHashAggregate.rowType(tf);
+        HashAggregateNode<Object[]> map = new HashAggregateNode<>(ctx, mapType, MAP, grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
+        map.register(scan);
+
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> reduce = new HashAggregateNode<>(ctx, reduceType, REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
+        reduce.register(map);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
+        root.register(reduce);
+
+        assertTrue(root.hasNext());
+        assertEquals(4, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void singleAvg() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.AVG,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(double.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        agg.register(scan);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+        root.register(agg);
+
+        assertTrue(root.hasNext());
+        assertEquals(725d, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void singleSum() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.SUM,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        agg.register(scan);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+        root.register(agg);
+
+        assertTrue(root.hasNext());
+        assertEquals(2900, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void singleMin() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.MIN,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        agg.register(scan);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+        root.register(agg);
+
+        assertTrue(root.hasNext());
+        assertEquals(200, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    @Test
+    public void singleMax() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.MAX,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(1),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> agg = new HashAggregateNode<>(ctx, aggType, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        agg.register(scan);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+        root.register(agg);
+
+        assertTrue(root.hasNext());
+        assertEquals(1400, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+
+    /** */
+    @Test
+    public void singleCount() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 200),
+            row("Roman", 300),
+            row("Ivan", 1400),
+            row("Alexey", 1000)
+        ));
+
+        AggregateCall call = AggregateCall.create(
+            SqlStdOperatorTable.COUNT,
+            false,
+            false,
+            false,
+            ImmutableIntList.of(),
+            -1,
+            RelCollations.EMPTY,
+            tf.createJavaType(int.class),
+            null);
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        HashAggregateNode<Object[]> agg = new HashAggregateNode<>(
+            ctx,
+            aggType,
+            SINGLE,
+            grpSets,
+            accFactory(ctx, call, SINGLE, rowType),
+            rowFactory()
+        );
+
+        agg.register(scan);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
+        root.register(agg);
+
+        assertTrue(root.hasNext());
+        assertEquals(4, root.next()[0]);
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    protected Supplier<List<AccumulatorWrapper<Object[]>>> accFactory(
+        ExecutionContext<Object[]> ctx,
+        AggregateCall call,
+        AggregateType type,
+        RelDataType rowType
+    ) {
+        return ctx.expressionFactory().accumulatorsFactory(type, F.asList(call), rowType);
+    }
+
+    /** */
+    protected RowHandler.RowFactory<Object[]> rowFactory() {
+        return new RowHandler.RowFactory<Object[]>() {
+            /** */
+            @Override public RowHandler<Object[]> handler() {
+                return ArrayRowHandler.INSTANCE;
+            }
+
+            /** */
+            @Override public Object[] create() {
+                throw new AssertionError();
+            }
+
+            /** */
+            @Override public Object[] create(Object... fields) {
+                return fields;
+            }
+        };
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateExecutionTest.java
new file mode 100644
index 0000000..a1a55af
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateExecutionTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Comparator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class SortAggregateExecutionTest extends BaseAggregateTest {
+    /** {@inheritDoc} */
+    @Override protected SingleNode<Object[]> createSingleAggregateNodesChain(
+        ExecutionContext<Object[]> ctx,
+        ImmutableList<ImmutableBitSet> grpSets,
+        AggregateCall call,
+        RelDataType inRowType,
+        RelDataType aggRowType,
+        RowHandler.RowFactory<Object[]> rowFactory,
+        ScanNode<Object[]> scan
+    ) {
+        assert grpSets.size() == 1;
+
+        ImmutableBitSet grpSet = F.first(grpSets);
+
+        assert !grpSet.isEmpty() : "Not applicable for sort aggregate";
+
+        RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(grpSet.asList()));
+
+        Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+        SortNode<Object[]> sort = new SortNode<>(ctx, inRowType, cmp);
+
+        sort.register(scan);
+
+        SortAggregateNode<Object[]> agg = new SortAggregateNode<>(
+            ctx,
+            aggRowType,
+            SINGLE,
+            grpSet,
+            accFactory(ctx, call, SINGLE, inRowType),
+            rowFactory,
+            cmp
+        );
+
+        agg.register(sort);
+
+        return agg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected SingleNode<Object[]> createMapReduceAggregateNodesChain(
+        ExecutionContext<Object[]> ctx,
+        ImmutableList<ImmutableBitSet> grpSets,
+        AggregateCall call,
+        RelDataType inRowType,
+        RelDataType aggRowType,
+        RowHandler.RowFactory<Object[]> rowFactory,
+        ScanNode<Object[]> scan
+    ) {
+        assert grpSets.size() == 1;
+
+        ImmutableBitSet grpSet = F.first(grpSets);
+
+        assert !grpSet.isEmpty() : "Not applicable for sort aggregate";
+
+        RelCollation collation = RelCollations.of(ImmutableIntList.copyOf(grpSet.asList()));
+
+        Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+        SortNode<Object[]> sort = new SortNode<>(ctx, inRowType, cmp);
+
+        sort.register(scan);
+
+        SortAggregateNode<Object[]> aggMap = new SortAggregateNode<>(
+            ctx,
+            aggRowType,
+            MAP,
+            grpSet,
+            accFactory(ctx, call, MAP, inRowType),
+            rowFactory,
+            cmp
+        );
+
+        aggMap.register(sort);
+
+        // The group's fields placed on the begin of the output row (planner
+        // does this by Projection node for aggregate input).
+        // Hash aggregate doesn't use groups set on reducer because send GroupKey as object.
+        ImmutableIntList reduceGrpFields = ImmutableIntList.copyOf(
+            IntStream.range(0, grpSet.cardinality()).boxed().collect(Collectors.toList())
+        );
+
+        RelCollation rdcCollation = RelCollations.of(reduceGrpFields);
+
+        Comparator<Object[]> rdcCmp = ctx.expressionFactory().comparator(rdcCollation);
+
+        SortAggregateNode<Object[]> aggRdc = new SortAggregateNode<>(
+            ctx,
+            aggRowType,
+            REDUCE,
+            ImmutableBitSet.of(reduceGrpFields),
+            accFactory(ctx, call, REDUCE, aggRowType),
+            rowFactory,
+            rdcCmp
+        );
+
+        aggRdc.register(aggMap);
+
+        return aggRdc;
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 861970f..4ec8f16 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -176,6 +176,9 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
     public static <T extends RelNode> List<T> findNodes(RelNode plan, Predicate<RelNode> pred) {
         List<T> ret = new ArrayList<>();
 
+        if (pred.test(plan))
+            ret.add((T)plan);
+
         plan.childrenAccept(
             new RelVisitor() {
                 @Override public void visit(RelNode node, int ordinal, RelNode parent) {
@@ -325,6 +328,8 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
 
     /** */
     protected void checkSplitAndSerialization(IgniteRel rel, IgniteSchema publicSchema) {
+        assertNotNull(rel);
+
         rel = Cloner.clone(rel);
 
         SchemaPlus schema = createRootSchema(false)
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
new file mode 100644
index 0000000..f630ade
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregateBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@SuppressWarnings({"TypeMayBeWeakened"})
+@RunWith(Parameterized.class)
+public class AggregatePlannerTest extends AbstractPlannerTest {
+    /** Algorithm. */
+    @Parameterized.Parameter
+    public AggregateAlgorithm algo;
+
+    /** */
+    @Parameterized.Parameters(name = "Algorithm = {0}")
+    public static List<Object[]> parameters() {
+        return Stream.of(AggregateAlgorithm.values()).map(a -> new Object[]{a}).collect(Collectors.toList());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void singleWithoutIndex() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL0", f.createJavaType(Integer.class))
+                .add("VAL1", f.createJavaType(Integer.class))
+                .add("GRP0", f.createJavaType(Integer.class))
+                .add("GRP1", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        }
+            .addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT AVG(val0) FROM test GROUP BY grp0";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            algo.ruleToDisable
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+        Assert.assertThat(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.first(agg.getAggCallList()).getAggregation(),
+            IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+        if (algo == AggregateAlgorithm.SORT)
+            assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void singleWithIndex() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL0", f.createJavaType(Integer.class))
+                .add("VAL1", f.createJavaType(Integer.class))
+                .add("GRP0", f.createJavaType(Integer.class))
+                .add("GRP1", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        }
+            .addIndex(RelCollations.of(ImmutableIntList.of(3, 4)), "grp0_grp1");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT AVG(val0) FILTER(WHERE val1 > 10) FROM test GROUP BY grp0";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            algo.ruleToDisable
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteAggregateBase agg = findFirstNode(phys, byClass(algo.single));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), agg);
+
+        Assert.assertThat(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.first(agg.getAggCallList()).getAggregation(),
+            IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+        if (algo == AggregateAlgorithm.SORT)
+            assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void mapReduceGroupBy() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL0", f.createJavaType(Integer.class))
+                .add("VAL1", f.createJavaType(Integer.class))
+                .add("GRP0", f.createJavaType(Integer.class))
+                .add("GRP1", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "test", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT AVG(val0) FILTER (WHERE val1 > 10) FROM test GROUP BY grp1, grp0";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            algo.ruleToDisable
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+        IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), rdcAgg);
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+        Assert.assertThat(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.first(rdcAgg.aggregateCalls()).getAggregation(),
+            IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+        Assert.assertThat(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.first(mapAgg.getAggCallList()).getAggregation(),
+            IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+        if (algo == AggregateAlgorithm.SORT)
+            assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @Ignore("Single aggregates must be disabled by hint: https://issues.apache.org/jira/browse/IGNITE-14274")
+    public void mapReduceDistinctWithIndex() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL0", f.createJavaType(Integer.class))
+                .add("VAL1", f.createJavaType(Integer.class))
+                .add("GRP0", f.createJavaType(Integer.class))
+                .add("GRP1", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "test", "hash");
+            }
+        }
+            .addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT DISTINCT val0, val1 FROM test";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            algo.ruleToDisable
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteAggregate mapAgg = findFirstNode(phys, byClass(algo.map));
+        IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys, SqlExplainLevel.ALL_ATTRIBUTES), rdcAgg);
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+        Assert.assertTrue(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.isEmpty(rdcAgg.aggregateCalls()));
+
+        Assert.assertTrue(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.isEmpty(mapAgg.getAggCallList()));
+
+        if (algo == AggregateAlgorithm.SORT)
+            assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+    }
+
+    /** */
+    @Test
+    public void notApplicableForSortAggregate() {
+        if (algo == AggregateAlgorithm.HASH)
+            return;
+
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable tbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL0", f.createJavaType(Integer.class))
+                .add("VAL1", f.createJavaType(Integer.class))
+                .add("GRP0", f.createJavaType(Integer.class))
+                .add("GRP1", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "test", "hash");
+            }
+        }
+            .addIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "val0_val1");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT MIN(val0) FROM test";
+
+        GridTestUtils.assertThrows(log,
+            () -> physicalPlan(
+                sql,
+                publicSchema,
+                "HashAggregateConverterRule"
+            ),
+            RelOptPlanner.CannotPlanException.class,
+            "There are not enough rules to produce a node with desired properties"
+        );
+    }
+
+    /** */
+    enum AggregateAlgorithm {
+        /** */
+        SORT(
+            IgniteSortAggregate.class,
+            IgniteMapSortAggregate.class,
+            IgniteReduceSortAggregate.class,
+            "HashAggregateConverterRule"
+        ),
+
+        /** */
+        HASH(
+            IgniteHashAggregate.class,
+            IgniteMapHashAggregate.class,
+            IgniteReduceHashAggregate.class,
+            "SortAggregateConverterRule"
+        );
+
+        /** */
+        public final Class<? extends IgniteAggregateBase> single;
+
+        /** */
+        public final Class<? extends IgniteAggregate> map;
+
+        /** */
+        public final Class<? extends IgniteReduceAggregateBase> reduce;
+
+        /** */
+        public final String ruleToDisable;
+
+        /** */
+        AggregateAlgorithm(
+            Class<? extends IgniteAggregateBase> single,
+            Class<? extends IgniteAggregate> map,
+            Class<? extends IgniteReduceAggregateBase> reduce,
+            String ruleToDisable) {
+            this.single = single;
+            this.map = map;
+            this.reduce = reduce;
+            this.ruleToDisable = ruleToDisable;
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregateTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregateTest.java
new file mode 100644
index 0000000..ff6f73e
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashAggregateTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class HashAggregateTest extends AbstractPlannerTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void test() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable employer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("SALARY", f.createJavaType(Double.class))
+                .build()) {
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2),
+                    select(nodes, 2, 0),
+                    select(nodes, 0, 1),
+                    select(nodes, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Employers", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("EMPS", employer);
+
+        String sql = "SELECT * FROM emps WHERE emps.salary = (SELECT AVG(emps.salary) FROM emps)";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema
+        );
+
+        assertNotNull(phys);
+
+        IgniteReduceHashAggregate rdcAgg = findFirstNode(phys, byClass(IgniteReduceHashAggregate.class));
+        IgniteMapHashAggregate mapAgg = findFirstNode(phys, byClass(IgniteMapHashAggregate.class));
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), rdcAgg);
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), mapAgg);
+
+        Assert.assertThat(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.first(rdcAgg.aggregateCalls()).getAggregation(),
+            IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+        Assert.assertThat(
+            "Invalid plan\n" + RelOptUtil.toString(phys),
+            F.first(mapAgg.getAggCallList()).getAggregation(),
+            IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
+
+        System.out.println("+++\n" + RelOptUtil.toString(phys));
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index a14060a..a15470f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -631,99 +631,6 @@ public class PlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testAggregate() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-        TestTable employer = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("NAME", f.createJavaType(String.class))
-                .add("SALARY", f.createJavaType(Double.class))
-                .build()) {
-
-            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
-                return ColocationGroup.forAssignments(Arrays.asList(
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2),
-                    select(nodes, 2, 0),
-                    select(nodes, 0, 1),
-                    select(nodes, 1, 2)
-                ));
-            }
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "Employers", "hash");
-            }
-        };
-
-        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
-        publicSchema.addTable("EMPS", employer);
-
-        SchemaPlus schema = createRootSchema(false)
-            .add("PUBLIC", publicSchema);
-
-        String sql = "SELECT * FROM emps WHERE emps.salary = (SELECT AVG(emps.salary) FROM emps)";
-
-        RelTraitDef<?>[] traitDefs = {
-            DistributionTraitDef.INSTANCE,
-            ConventionTraitDef.INSTANCE,
-            RelCollationTraitDef.INSTANCE,
-            RewindabilityTraitDef.INSTANCE,
-            CorrelationTraitDef.INSTANCE
-        };
-
-        PlanningContext ctx = PlanningContext.builder()
-            .localNodeId(F.first(nodes))
-            .originatingNodeId(F.first(nodes))
-            .parentContext(Contexts.empty())
-            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
-                .defaultSchema(schema)
-                .traitDefs(traitDefs)
-                .build())
-            .logger(log)
-            .query(sql)
-            .parameters(2)
-            .topologyVersion(AffinityTopologyVersion.NONE)
-            .build();
-
-        RelNode root;
-
-        try (IgnitePlanner planner = ctx.planner()) {
-            assertNotNull(planner);
-
-            String qry = ctx.query();
-
-            assertNotNull(qry);
-
-            // Parse
-            SqlNode sqlNode = planner.parse(qry);
-
-            // Validate
-            sqlNode = planner.validate(sqlNode);
-
-            // Convert to Relational operators graph
-            root = planner.convert(sqlNode);
-
-            // Transformation chain
-            root = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, root.getTraitSet(), root);
-
-            // Transformation chain
-            RelTraitSet desired = root.getCluster().traitSet()
-                .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            root = planner.transform(PlannerPhase.OPTIMIZATION, desired, root);
-        }
-
-        assertNotNull(root);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
     public void testHepPlaner() throws Exception {
         IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index 4de7817..2925782 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -19,9 +19,12 @@ package org.apache.ignite.testsuites;
 
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -37,6 +40,9 @@ import org.junit.runners.Suite;
     NestedLoopJoinExecutionTest.class,
     TableSpoolExecutionTest.class,
     IndexSpoolExecutionTest.class,
+    HashAggregateExecutionTest.class,
+    HashAggregateSingleGroupExecutionTest.class,
+    SortAggregateExecutionTest.class,
 })
 public class ExecutionTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 088e15e..9c656ca 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -22,10 +22,11 @@ import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondary
 import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
 import org.apache.ignite.internal.processors.query.calcite.CancelTest;
-import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
 import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
+import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
+import org.apache.ignite.internal.processors.query.calcite.SortAggregateIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
@@ -56,7 +57,8 @@ import org.junit.runners.Suite;
     LimitOffsetTest.class,
     SqlFieldsQueryUsageTest.class,
     AggregatesIntegrationTest.class,
-    MetadataIntegrationTest.class
+    MetadataIntegrationTest.class,
+    SortAggregateIntegrationTest.class,
 })
 public class IgniteCalciteTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index ad329f1..959021f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -32,7 +33,8 @@ import org.junit.runners.Suite;
     PlannerTest.class,
     CorrelatedNestedLoopJoinPlannerTest.class,
     TableSpoolPlannerTest.class,
-    IndexSpoolPlannerTest.class
+    IndexSpoolPlannerTest.class,
+    AggregatePlannerTest.class,
 })
 public class PlannerTestSuite {
 }