You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2014/10/08 04:37:12 UTC

[1/2] TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)

Repository: tajo
Updated Branches:
  refs/heads/master de28c8294 -> 0dfa3972c


http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 919ac9b..1f348ff 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -43,13 +43,18 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.plan.proto.PlanProto;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -810,9 +815,30 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         if (grpNode.getType() == NodeType.GROUP_BY) {
           hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
         } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
-          hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0;
+          // Find current distinct stage node.
+          DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+          if (distinctNode == null) {
+            LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
+            distinctNode = (DistinctGroupbyNode)grpNode;
+          }
+          hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
+
+          Enforcer enforcer = subQuery.getBlock().getEnforcer();
+          if (enforcer == null) {
+            LOG.warn(subQuery.getId() + ", DistinctGroupbyNode's enforcer is null.");
+          }
+          EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
+          if (property != null) {
+            if (property.getDistinct().getIsMultipleAggregation()) {
+              MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
+              if (stage != MultipleAggregationStage.THRID_STAGE) {
+                hasGroupColumns = true;
+              }
+            }
+          }
         }
         if (!hasGroupColumns) {
+          LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
           return 1;
         } else {
           long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index bde2459..2760301 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -315,6 +315,12 @@ message DistinctGroupbyEnforcer {
     SORT_AGGREGATION = 1;
   }
 
+  enum MultipleAggregationStage {
+    FIRST_STAGE = 0;
+    SECOND_STAGE = 1;
+    THRID_STAGE = 3;
+  }
+
   message SortSpecArray {
     required int32 pid = 1;
     repeated SortSpecProto sortSpecs = 2;
@@ -322,6 +328,8 @@ message DistinctGroupbyEnforcer {
   required int32 pid = 1;
   required DistinctAggregationAlgorithm algorithm = 2;
   repeated SortSpecArray sortSpecArrays = 3;
+  required bool isMultipleAggregation = 4 [default = false];
+  optional MultipleAggregationStage multipleAggregationStage = 5;
 }
 
 message EnforcerProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index fccec26..8b9f9f7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -20,10 +20,7 @@ package org.apache.tajo.engine.query;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -34,9 +31,14 @@ import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TajoWorker;
+import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.sql.ResultSet;
 import java.util.*;
@@ -44,11 +46,33 @@ import java.util.*;
 import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
 public class TestGroupByQuery extends QueryTestCaseBase {
   private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class);
 
-  public TestGroupByQuery() throws Exception {
+  public TestGroupByQuery(String groupByOption) throws Exception {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
+
+    Map<String, String> variables = new HashMap<String, String>();
+    if (groupByOption.equals("MultiLevel")) {
+      variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "true");
+    } else {
+      variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "false");
+    }
+    client.updateSessionVariables(variables);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    client.unsetSessionVariables(TUtil.newList(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname()));
+  }
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {"MultiLevel"},
+        {"No-MultiLevel"},
+    });
   }
 
   @Test
@@ -285,6 +309,24 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testDistinctAggregation8() throws Exception {
+    /*
+    select
+    sum(distinct l_orderkey),
+        l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+        count(distinct l_partkey),
+        sum(l_orderkey)
+    from
+        lineitem
+    group by
+    l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+    */
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
   public final void testDistinctAggregationWithHaving1() throws Exception {
     // select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey) from lineitem
     // group by l_linenumber having sum(distinct l_orderkey) >= 6;
@@ -343,6 +385,14 @@ public class TestGroupByQuery extends QueryTestCaseBase {
     assertResultSet(res, "testDistinctAggregation_case8.result");
     res.close();
 
+    res = executeFile("testDistinctAggregation_case9.sql");
+    assertResultSet(res, "testDistinctAggregation_case9.result");
+    res.close();
+
+    res = executeFile("testDistinctAggregation_case10.sql");
+    assertResultSet(res, "testDistinctAggregation_case10.result");
+    res.close();
+
     // case9
     KeyValueSet tableOptions = new KeyValueSet();
     tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
new file mode 100644
index 0000000..0553d06
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
@@ -0,0 +1,9 @@
+select
+  sum(distinct l_orderkey),
+  l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+  count(distinct l_partkey),
+  sum(l_orderkey)
+from
+  lineitem
+group by
+  l_linenumber, l_returnflag, l_linestatus, l_shipdate;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
new file mode 100644
index 0000000..6ab7c25
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
@@ -0,0 +1,5 @@
+select sum(cnt1), sum(sum2)
+from (
+  select o_orderdate, count(distinct o_orderpriority), count(distinct o_orderkey) cnt1, sum(o_totalprice) sum2
+  from orders group by o_orderdate
+) a
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
new file mode 100644
index 0000000..6265599
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
@@ -0,0 +1,11 @@
+select
+    lineitem.l_orderkey as l_orderkey,
+    count(distinct lineitem.l_partkey) as cnt1,
+    sum(lineitem.l_quantity + lineitem.l_linenumber)/count(distinct lineitem.l_suppkey) as value2,
+    lineitem.l_partkey as l_partkey,
+    avg(lineitem.l_quantity) as avg1,
+    count(distinct lineitem.l_suppkey) as cnt2
+from
+    lineitem
+group by
+    lineitem.l_orderkey, lineitem.l_partkey
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
new file mode 100644
index 0000000..519390d
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
@@ -0,0 +1,7 @@
+?sum,l_linenumber,l_returnflag,l_linestatus,l_shipdate,?count_1,?sum_2
+-------------------------------
+1,1,N,O,1996-03-13,1,1
+2,1,N,O,1997-01-28,1,2
+3,1,R,F,1994-02-02,1,3
+1,2,N,O,1996-04-12,1,1
+3,2,R,F,1993-11-09,1,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
new file mode 100644
index 0000000..2035d9f
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
@@ -0,0 +1,3 @@
+?sum,?sum_1
+-------------------------------
+3,414440.9
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
new file mode 100644
index 0000000..506eea0
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
@@ -0,0 +1,6 @@
+l_orderkey,cnt1,value2,l_partkey,avg1,cnt2
+-------------------------------
+1,1,28.0,1,26.5,2
+2,1,39.0,2,38.0,1
+3,1,46.0,2,45.0,1
+3,1,51.0,3,49.0,1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index e6b12b1..25f1ae7 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -25,6 +25,7 @@ Available Session Variables:
 \set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb)
 \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb)
 \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb)
+\set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled
 \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
 \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
 \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index 51388a4..084c105 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -42,10 +42,6 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
   @SuppressWarnings("unused")
   private final boolean[] nullFirsts;  
 
-  private Datum left;
-  private Datum right;
-  private int compVal;
-
   /**
    * @param schema The schema of input tuples
    * @param sortKeys The description of sort keys
@@ -88,6 +84,10 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
 
   @Override
   public int compare(Tuple tuple1, Tuple tuple2) {
+    Datum left = null;
+    Datum right = null;
+    int compVal = 0;
+
     for (int i = 0; i < sortKeyIds.length; i++) {
       left = tuple1.get(sortKeyIds[i]);
       right = tuple2.get(sortKeyIds[i]);


[2/2] git commit: TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)

Posted by bl...@apache.org.
TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)

Closes #136


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0dfa3972
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0dfa3972
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0dfa3972

Branch: refs/heads/master
Commit: 0dfa3972c6a52d785b8e55f91d0906456a3926b3
Parents: de28c82
Author: Jaehwa Jung <bl...@apache.org>
Authored: Wed Oct 8 11:35:45 2014 +0900
Committer: Jaehwa Jung <bl...@apache.org>
Committed: Wed Oct 8 11:35:45 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../main/java/org/apache/tajo/SessionVars.java  |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 +
 .../eval/AggregationFunctionCallEval.java       |  38 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  56 ++-
 .../tajo/engine/planner/enforce/Enforcer.java   |  13 +
 .../engine/planner/global/GlobalPlanner.java    |  58 +--
 .../global/builder/DistinctGroupbyBuilder.java  | 329 ++++++++++++-
 .../planner/logical/DistinctGroupbyNode.java    |  52 +-
 .../DistinctGroupbyFirstAggregationExec.java    | 476 +++++++++++++++++++
 .../DistinctGroupbySecondAggregationExec.java   | 295 ++++++++++++
 .../DistinctGroupbyThirdAggregationExec.java    | 304 ++++++++++++
 .../tajo/master/querymaster/Repartitioner.java  |  42 +-
 .../tajo/master/querymaster/SubQuery.java       |  28 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   8 +
 .../tajo/engine/query/TestGroupByQuery.java     |  60 ++-
 .../testDistinctAggregation8.sql                |   9 +
 .../testDistinctAggregation_case10.sql          |   5 +
 .../testDistinctAggregation_case9.sql           |  11 +
 .../testDistinctAggregation8.result             |   7 +
 .../testDistinctAggregation_case10.result       |   3 +
 .../testDistinctAggregation_case9.result        |   6 +
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 .../apache/tajo/storage/TupleComparator.java    |   8 +-
 24 files changed, 1735 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index bff9583..7aa7a0c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)
+
     TAJO-1093: DateTimeFormat.to_char() is slower than SimpleDateFormat.format().
     (Jihun Kang via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index cc875b2..1229849 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -98,6 +98,8 @@ public enum SessionVars implements ConfigKey {
   TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME,
       "shuffle output size for partition table write (mb)", DEFAULT),
 
+  GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT),
+
   // for physical Executors
   EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT),
   HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT),

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index f9f5e4a..66d3030 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -316,6 +316,8 @@ public class TajoConf extends Configuration {
     $DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256),
     $DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256),
 
+    $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true),
+
     // for physical Executors
     $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
     $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
index ab18aa9..3216519 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
@@ -30,7 +30,10 @@ import org.apache.tajo.storage.VTuple;
 
 public class AggregationFunctionCallEval extends FunctionEval implements Cloneable {
   @Expose protected AggFunction instance;
-  @Expose boolean firstPhase = false;
+  @Expose boolean intermediatePhase = false;
+  @Expose boolean finalPhase = true;
+  @Expose String alias;
+
   private Tuple params;
 
   protected AggregationFunctionCallEval(EvalType type, FunctionDesc desc, AggFunction instance, EvalNode[] givenArgs) {
@@ -58,7 +61,8 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
       }
     }
 
-    if (firstPhase) {
+    if (!intermediatePhase && !finalPhase) {
+      // firstPhase
       instance.eval(context, params);
     } else {
       instance.merge(context, params);
@@ -71,7 +75,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
   }
 
   public Datum terminate(FunctionContext context) {
-    if (firstPhase) {
+    if (!finalPhase) {
       return instance.getPartialResult(context);
     } else {
       return instance.terminate(context);
@@ -80,18 +84,40 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
 
   @Override
   public DataType getValueType() {
-    if (firstPhase) {
+    if (!finalPhase) {
       return instance.getPartialResultType();
     } else {
       return funcDesc.getReturnType();
     }
   }
 
+  public void setAlias(String alias) { this.alias = alias; }
+
+  public String getAlias() { return  this.alias; }
+
   public Object clone() throws CloneNotSupportedException {
-    return super.clone();
+    AggregationFunctionCallEval clone = (AggregationFunctionCallEval)super.clone();
+
+    clone.finalPhase = finalPhase;
+    clone.intermediatePhase = intermediatePhase;
+    clone.alias = alias;
+    clone.instance = (AggFunction)instance.clone();
+
+    return clone;
   }
 
   public void setFirstPhase() {
-    this.firstPhase = true;
+    this.finalPhase = false;
+    this.intermediatePhase = false;
+  }
+
+  public void setFinalPhase() {
+    this.finalPhase = true;
+    this.intermediatePhase = false;
+  }
+
+  public void setIntermediatePhase() {
+    this.finalPhase = false;
+    this.intermediatePhase = true;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2730202..6b1c65c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -43,6 +43,7 @@ import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageConstants;
@@ -56,6 +57,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
 
@@ -1047,17 +1049,61 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode);
     if (property != null) {
-      DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
-      if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
-        return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+      if (property.getDistinct().getIsMultipleAggregation()) {
+        MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
+
+        if (stage == MultipleAggregationStage.FIRST_STAGE) {
+          return new DistinctGroupbyFirstAggregationExec(context, distinctNode, subOp);
+        } else if (stage == MultipleAggregationStage.SECOND_STAGE) {
+          return new DistinctGroupbySecondAggregationExec(context, distinctNode,
+              createSortExecForDistinctGroupby(context, distinctNode, subOp, 2));
+        } else {
+          return new DistinctGroupbyThirdAggregationExec(context, distinctNode,
+              createSortExecForDistinctGroupby(context, distinctNode, subOp, 3));
+        }
       } else {
-        return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
+        DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
+        if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
+          return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+        } else {
+          return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
+        }
       }
     } else {
       return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
     }
   }
 
+  private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context,
+                                                    DistinctGroupbyNode distinctNode,
+                                                    PhysicalExec subOp,
+                                                    int phase) throws IOException {
+    SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    //2 phase: seq, groupby columns, distinct1 keys, distinct2 keys,
+    //3 phase: groupby columns, seq, distinct1 keys, distinct2 keys,
+    List<SortSpec> sortSpecs = new ArrayList<SortSpec>();
+    if (phase == 2) {
+      sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
+    }
+    for (Column eachColumn: distinctNode.getGroupingColumns()) {
+      sortSpecs.add(new SortSpec(eachColumn));
+    }
+    if (phase == 3) {
+      sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
+    }
+    for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) {
+      for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
+        sortSpecs.add(new SortSpec(eachColumn));
+      }
+    }
+    sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[]{}));
+    sortNode.setInSchema(distinctNode.getInSchema());
+    sortNode.setOutSchema(distinctNode.getInSchema());
+    ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, subOp);
+
+    return sortExec;
+  }
+
   private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx,
       DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException {
     return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp);
@@ -1145,7 +1191,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   }
 
-  private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
+  public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
     if (enforcer == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 031569e..e2d7744 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.util.TUtil;
 
@@ -135,13 +136,25 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
   public void enforceDistinctAggregation(int pid,
                                          DistinctAggregationAlgorithm algorithm,
                                          List<SortSpecArray> sortSpecArrays) {
+    enforceDistinctAggregation(pid, false, null, algorithm, sortSpecArrays);
+  }
+
+  public void enforceDistinctAggregation(int pid,
+                                         boolean isMultipleAggregation,
+                                         MultipleAggregationStage stage,
+                                         DistinctAggregationAlgorithm algorithm,
+                                         List<SortSpecArray> sortSpecArrays) {
     EnforceProperty.Builder builder = newProperty();
     DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder();
     enforce.setPid(pid);
+    enforce.setIsMultipleAggregation(isMultipleAggregation);
     enforce.setAlgorithm(algorithm);
     if (sortSpecArrays != null) {
       enforce.addAllSortSpecArrays(sortSpecArrays);
     }
+    if (stage != null) {
+      enforce.setMultipleAggregationStage(stage);
+    }
 
     builder.setType(EnforceType.DISTINCT_GROUP_BY);
     builder.setDistinct(enforce.build());

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 432589b..01e02d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -658,47 +658,6 @@ public class GlobalPlanner {
     return rewritten;
   }
 
-  public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
-                                                  DistinctGroupbyNode firstPhaseGroupBy,
-                                                  DistinctGroupbyNode secondPhaseGroupBy) {
-    DataChannel lastDataChannel = null;
-
-    // It pushes down the first phase group-by operator into all child blocks.
-    //
-    // (second phase)    G (currentBlock)
-    //                  /|\
-    //                / / | \
-    // (first phase) G G  G  G (child block)
-
-    // They are already connected one another.
-    // So, we don't need to connect them again.
-    for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
-      if (firstPhaseGroupBy.isEmptyGrouping()) {
-        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
-      } else {
-        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
-      }
-      dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
-      ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
-
-      // Why must firstPhaseGroupby be copied?
-      //
-      // A groupby in each execution block can have different child.
-      // It affects groupby's input schema.
-      DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
-      firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
-      childBlock.setPlan(firstPhaseGroupbyCopy);
-
-      // just keep the last data channel.
-      lastDataChannel = dataChannel;
-    }
-
-    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
-    secondPhaseGroupBy.setChild(scanNode);
-    lastBlock.setPlan(secondPhaseGroupBy);
-    return lastBlock;
-  }
-
   /**
    * If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows:
    *
@@ -824,8 +783,20 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock;
 
     if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
-      DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
-      return builder.buildPlan(context, lastBlock, groupbyNode);
+      boolean multiLevelEnabled = context.getPlan().getContext().getBool(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED);
+
+      if (multiLevelEnabled) {
+        if (PlannerUtil.findTopNode(groupbyNode, NodeType.UNION) == null) {
+          DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+          return builder.buildMultiLevelPlan(context, lastBlock, groupbyNode);
+        } else {
+          DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+          return builder.buildPlan(context, lastBlock, groupbyNode);
+        }
+      } else {
+        DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+        return builder.buildPlan(context, lastBlock, groupbyNode);
+      }
     } else {
       GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
 
@@ -968,6 +939,7 @@ public class GlobalPlanner {
         firstPhaseEvals[i].setFirstPhase();
         firstPhaseEvalNames[i] = plan.generateUniqueColumnName(firstPhaseEvals[i]);
         FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
+        secondPhaseEvals[i].setFinalPhase();
         secondPhaseEvals[i].setArgs(new EvalNode[] {param});
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 8727b84..cbe2d7e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
+import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
@@ -36,11 +37,14 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.util.TUtil;
 
@@ -56,6 +60,255 @@ public class DistinctGroupbyBuilder {
     this.globalPlanner = globalPlanner;
   }
 
+  public ExecutionBlock buildMultiLevelPlan(GlobalPlanContext context,
+                                            ExecutionBlock latestExecBlock,
+                                            LogicalNode currentNode) throws PlanningException {
+    try {
+      GroupbyNode groupbyNode = (GroupbyNode) currentNode;
+
+      LogicalPlan plan = context.getPlan().getLogicalPlan();
+
+      DistinctGroupbyNode baseDistinctNode =
+          buildMultiLevelBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode);
+      baseDistinctNode.setGroupbyPlan(groupbyNode);
+
+      // Set total Aggregation Functions.
+      AggregationFunctionCallEval[] aggFunctions =
+          new AggregationFunctionCallEval[groupbyNode.getAggFunctions().length];
+
+      for (int i = 0; i < aggFunctions.length; i++) {
+        aggFunctions[i] = (AggregationFunctionCallEval) groupbyNode.getAggFunctions()[i].clone();
+        aggFunctions[i].setFirstPhase();
+        // If there is not grouping column, we can't find column alias.
+        // Thus we should find the alias at Groupbynode output schema.
+        if (groupbyNode.getGroupingColumns().length == 0
+            && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) {
+          aggFunctions[i].setAlias(groupbyNode.getOutSchema().getColumn(i).getQualifiedName());
+        }
+      }
+
+      if (groupbyNode.getGroupingColumns().length == 0
+          && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) {
+        groupbyNode.setAggFunctions(aggFunctions);
+      }
+
+      baseDistinctNode.setAggFunctions(aggFunctions);
+
+      // Create First, SecondStage's Node using baseNode
+      DistinctGroupbyNode firstStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+      DistinctGroupbyNode secondStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+      DistinctGroupbyNode thirdStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+
+      // Set second, third non-distinct aggregation's eval node to field eval
+      GroupbyNode lastGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(secondStageDistinctNode.getGroupByNodes().size() - 1);
+      if (!lastGroupbyNode.isDistinct()) {
+        int index = 0;
+        for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) {
+          aggrFunction.setIntermediatePhase();
+          aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())});
+          index++;
+        }
+      }
+      lastGroupbyNode = thirdStageDistinctNode.getGroupByNodes().get(thirdStageDistinctNode.getGroupByNodes().size() - 1);
+      if (!lastGroupbyNode.isDistinct()) {
+        int index = 0;
+        for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) {
+          aggrFunction.setFirstPhase();
+          aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())});
+          index++;
+        }
+      }
+
+      // Set in & out schema for each DistinctGroupbyNode.
+      secondStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema());
+      secondStageDistinctNode.setOutSchema(firstStageDistinctNode.getOutSchema());
+      thirdStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema());
+      thirdStageDistinctNode.setOutSchema(groupbyNode.getOutSchema());
+
+      // Set latestExecBlock's plan with firstDistinctNode
+      latestExecBlock.setPlan(firstStageDistinctNode);
+
+      // Make SecondStage ExecutionBlock
+      ExecutionBlock secondStageBlock = context.getPlan().newExecutionBlock();
+
+      // Make ThirdStage ExecutionBlock
+      ExecutionBlock thirdStageBlock = context.getPlan().newExecutionBlock();
+
+      // Set Enforcer
+      setMultiStageAggregationEnforcer(latestExecBlock, firstStageDistinctNode, secondStageBlock,
+          secondStageDistinctNode, thirdStageBlock, thirdStageDistinctNode);
+
+      //Create data channel FirstStage to SecondStage
+      DataChannel firstChannel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 32);
+
+      firstChannel.setShuffleKeys(firstStageDistinctNode.getFirstStageShuffleKeyColumns());
+      firstChannel.setSchema(firstStageDistinctNode.getOutSchema());
+      firstChannel.setStoreType(globalPlanner.getStoreType());
+
+      ScanNode scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), firstChannel);
+      secondStageDistinctNode.setChild(scanNode);
+
+      secondStageBlock.setPlan(secondStageDistinctNode);
+
+      context.getPlan().addConnect(firstChannel);
+
+      DataChannel secondChannel;
+      //Create data channel SecondStage to ThirdStage
+      if (groupbyNode.isEmptyGrouping()) {
+        secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 1);
+        secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+      } else {
+        secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 32);
+        secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+      }
+      secondChannel.setSchema(secondStageDistinctNode.getOutSchema());
+      secondChannel.setStoreType(globalPlanner.getStoreType());
+
+      scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), secondChannel);
+      thirdStageDistinctNode.setChild(scanNode);
+
+      thirdStageBlock.setPlan(thirdStageDistinctNode);
+
+      context.getPlan().addConnect(secondChannel);
+
+      if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
+        buildDistinctGroupbyAndUnionPlan(
+            context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode);
+      }
+
+      return thirdStageBlock;
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new PlanningException(e);
+    }
+  }
+
+  private DistinctGroupbyNode buildMultiLevelBaseDistinctGroupByNode(GlobalPlanContext context,
+                                                                     ExecutionBlock latestExecBlock,
+                                                                     GroupbyNode groupbyNode) {
+    LogicalPlan plan = context.getPlan().getLogicalPlan();
+
+    /*
+     Making DistinctGroupbyNode from GroupByNode
+     select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1
+     => DistinctGroupbyNode
+        Distinct Seq
+        grouping key = col1
+        Sub GroupbyNodes
+         - GroupByNode1: grouping(col2), expr(count distinct col2)
+         - GroupByNode2: grouping(col3), expr(count distinct col3)
+         - GroupByNode3: expr(sum col4)
+    */
+    List<Column> originalGroupingColumns = Arrays.asList(groupbyNode.getGroupingColumns());
+
+    List<GroupbyNode> childGroupbyNodes = new ArrayList<GroupbyNode>();
+
+    List<AggregationFunctionCallEval> otherAggregationFunctionCallEvals = new ArrayList<AggregationFunctionCallEval>();
+    List<Target> otherAggregationFunctionTargets = new ArrayList<Target>();
+
+    //distinct columns -> GroupbyNode
+    Map<String, DistinctGroupbyNodeBuildInfo> distinctNodeBuildInfos = new HashMap<String, DistinctGroupbyNodeBuildInfo>();
+    AggregationFunctionCallEval[] aggFunctions = groupbyNode.getAggFunctions();
+    for (int aggIdx = 0; aggIdx < aggFunctions.length; aggIdx++) {
+      AggregationFunctionCallEval aggFunction = aggFunctions[aggIdx];
+      aggFunction.setFirstPhase();
+      Target originAggFunctionTarget = groupbyNode.getTargets()[originalGroupingColumns.size() + aggIdx];
+      Target aggFunctionTarget =
+          new Target(new FieldEval(originAggFunctionTarget.getEvalTree().getName(), aggFunction.getValueType()));
+
+      if (aggFunction.isDistinct()) {
+        // Create or reuse Groupby node for each Distinct expression.
+        LinkedHashSet<Column> groupbyUniqColumns = EvalTreeUtil.findUniqueColumns(aggFunction);
+        String groupbyMapKey = EvalTreeUtil.columnsToStr(groupbyUniqColumns);
+        DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(groupbyMapKey);
+        if (buildInfo == null) {
+          GroupbyNode distinctGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+          buildInfo = new DistinctGroupbyNodeBuildInfo(distinctGroupbyNode);
+          distinctNodeBuildInfos.put(groupbyMapKey, buildInfo);
+
+          // Grouping columns are GROUP BY clause's column + Distinct column.
+          List<Column> groupingColumns = new ArrayList<Column>();
+          for (Column eachGroupingColumn: groupbyUniqColumns) {
+            if (!groupingColumns.contains(eachGroupingColumn)) {
+              groupingColumns.add(eachGroupingColumn);
+            }
+          }
+          distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[]{}));
+        }
+        buildInfo.addAggFunction(aggFunction);
+        buildInfo.addAggFunctionTarget(aggFunctionTarget);
+      } else {
+        otherAggregationFunctionCallEvals.add(aggFunction);
+        otherAggregationFunctionTargets.add(aggFunctionTarget);
+      }
+    }
+
+    List<Target> baseGroupByTargets = new ArrayList<Target>();
+    baseGroupByTargets.add(new Target(new FieldEval(new Column("?distinctseq", Type.INT2))));
+    for (Column column : originalGroupingColumns) {
+      baseGroupByTargets.add(new Target(new FieldEval(column)));
+    }
+
+    //Add child groupby node for each Distinct clause
+    for (String eachKey: distinctNodeBuildInfos.keySet()) {
+      DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(eachKey);
+      GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode();
+      List<AggregationFunctionCallEval> groupbyAggFunctions = buildInfo.getAggFunctions();
+      String [] firstPhaseEvalNames = new String[groupbyAggFunctions.size()];
+      int index = 0;
+      for (AggregationFunctionCallEval eachCallEval: groupbyAggFunctions) {
+        firstPhaseEvalNames[index++] = eachCallEval.getName();
+      }
+
+      Target[] targets = new Target[eachGroupbyNode.getGroupingColumns().length + groupbyAggFunctions.size()];
+      int targetIdx = 0;
+
+      for (Column column : eachGroupbyNode.getGroupingColumns()) {
+        Target target = new Target(new FieldEval(column));
+        targets[targetIdx++] = target;
+        baseGroupByTargets.add(target);
+      }
+      for (Target eachAggFunctionTarget: buildInfo.getAggFunctionTargets()) {
+        targets[targetIdx++] = eachAggFunctionTarget;
+      }
+      eachGroupbyNode.setTargets(targets);
+      eachGroupbyNode.setAggFunctions(groupbyAggFunctions.toArray(new AggregationFunctionCallEval[]{}));
+      eachGroupbyNode.setDistinct(true);
+      eachGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+      childGroupbyNodes.add(eachGroupbyNode);
+    }
+
+    // Merge other aggregation function to a GroupBy Node.
+    if (!otherAggregationFunctionCallEvals.isEmpty()) {
+      // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, .... + AGG_VALUE, SUM_VALUE, ...
+      GroupbyNode otherGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+
+      Target[] targets = new Target[otherAggregationFunctionTargets.size()];
+      int targetIdx = 0;
+      for (Target eachTarget : otherAggregationFunctionTargets) {
+        targets[targetIdx++] = eachTarget;
+        baseGroupByTargets.add(eachTarget);
+      }
+
+      otherGroupbyNode.setTargets(targets);
+      otherGroupbyNode.setGroupingColumns(new Column[]{});
+      otherGroupbyNode.setAggFunctions(otherAggregationFunctionCallEvals.toArray(new AggregationFunctionCallEval[]{}));
+      otherGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+      childGroupbyNodes.add(otherGroupbyNode);
+    }
+
+    DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID());
+    baseDistinctNode.setTargets(baseGroupByTargets.toArray(new Target[]{}));
+    baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns());
+    baseDistinctNode.setInSchema(groupbyNode.getInSchema());
+    baseDistinctNode.setChild(groupbyNode.getChild());
+
+    baseDistinctNode.setGroupbyNodes(childGroupbyNodes);
+
+    return baseDistinctNode;
+  }
 
   public ExecutionBlock buildPlan(GlobalPlanContext context,
                                   ExecutionBlock latestExecBlock,
@@ -66,7 +319,7 @@ public class DistinctGroupbyBuilder {
       DistinctGroupbyNode baseDistinctNode = buildBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode);
 
       // Create First, SecondStage's Node using baseNode
-      DistinctGroupbyNode[] distinctNodes = createMultiPhaseDistinctNode(plan, groupbyNode, baseDistinctNode);
+      DistinctGroupbyNode[] distinctNodes = createTwoPhaseDistinctNode(plan, groupbyNode, baseDistinctNode);
 
       DistinctGroupbyNode firstStageDistinctNode = distinctNodes[0];
       DistinctGroupbyNode secondStageDistinctNode = distinctNodes[1];
@@ -100,7 +353,7 @@ public class DistinctGroupbyBuilder {
       context.getPlan().addConnect(channel);
 
       if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
-        globalPlanner.buildDistinctGroupbyAndUnionPlan(
+        buildDistinctGroupbyAndUnionPlan(
             context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode);
       }
 
@@ -162,6 +415,7 @@ public class DistinctGroupbyBuilder {
         buildInfo.addAggFunction(aggFunction);
         buildInfo.addAggFunctionTarget(aggFunctionTarget);
       } else {
+        aggFunction.setFinalPhase();
         otherAggregationFunctionCallEvals.add(aggFunction);
         otherAggregationFunctionTargets.add(aggFunctionTarget);
       }
@@ -224,7 +478,7 @@ public class DistinctGroupbyBuilder {
     return baseDistinctNode;
   }
 
-  public DistinctGroupbyNode[] createMultiPhaseDistinctNode(LogicalPlan plan,
+  public DistinctGroupbyNode[] createTwoPhaseDistinctNode(LogicalPlan plan,
                                                                    GroupbyNode originGroupbyNode,
                                                                    DistinctGroupbyNode baseDistinctNode) {
     /*
@@ -456,6 +710,75 @@ public class DistinctGroupbyBuilder {
 
   }
 
+  private void setMultiStageAggregationEnforcer(
+      ExecutionBlock firstStageBlock, DistinctGroupbyNode firstStageDistinctNode,
+      ExecutionBlock secondStageBlock, DistinctGroupbyNode secondStageDistinctNode,
+      ExecutionBlock thirdStageBlock, DistinctGroupbyNode thirdStageDistinctNode) {
+    firstStageBlock.getEnforcer().enforceDistinctAggregation(firstStageDistinctNode.getPID(),
+        true, MultipleAggregationStage.FIRST_STAGE,
+        DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+    secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(),
+        true, MultipleAggregationStage.SECOND_STAGE,
+        DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+    List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>();
+    int index = 0;
+    for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+      List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>();
+      for (Column column: groupbyNode.getGroupingColumns()) {
+        sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build());
+      }
+      sortSpecArrays.add( SortSpecArray.newBuilder()
+          .setPid(thirdStageDistinctNode.getGroupByNodes().get(index).getPID())
+          .addAllSortSpecs(sortSpecs).build());
+    }
+    thirdStageBlock.getEnforcer().enforceDistinctAggregation(thirdStageDistinctNode.getPID(),
+        true, MultipleAggregationStage.THRID_STAGE,
+        DistinctAggregationAlgorithm.SORT_AGGREGATION, sortSpecArrays);
+  }
+
+  private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+                                                         DistinctGroupbyNode firstPhaseGroupBy,
+                                                         DistinctGroupbyNode secondPhaseGroupBy) {
+    DataChannel lastDataChannel = null;
+
+    // It pushes down the first phase group-by operator into all child blocks.
+    //
+    // (second phase)    G (currentBlock)
+    //                  /|\
+    //                / / | \
+    // (first phase) G G  G  G (child block)
+
+    // They are already connected one another.
+    // So, we don't need to connect them again.
+    for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+      if (firstPhaseGroupBy.isEmptyGrouping()) {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+      } else {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
+      }
+      dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+      ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+      // Why must firstPhaseGroupby be copied?
+      //
+      // A groupby in each execution block can have different child.
+      // It affects groupby's input schema.
+      DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+      firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+      childBlock.setPlan(firstPhaseGroupbyCopy);
+
+      // just keep the last data channel.
+      lastDataChannel = dataChannel;
+    }
+
+    ScanNode scanNode = GlobalPlanner.buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+    secondPhaseGroupBy.setChild(scanNode);
+    lastBlock.setPlan(secondPhaseGroupBy);
+    return lastBlock;
+  }
+
   static class DistinctGroupbyNodeBuildInfo {
     private GroupbyNode groupbyNode;
     private List<AggregationFunctionCallEval> aggFunctions = new ArrayList<AggregationFunctionCallEval>();

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
index b1e4bc3..47e8933 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
@@ -27,10 +27,15 @@ import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class DistinctGroupbyNode extends UnaryNode implements Projectable, Cloneable {
   @Expose
+  private GroupbyNode groupbyPlan;
+
+  @Expose
   private List<GroupbyNode> groupByNodes;
 
   @Expose
@@ -42,6 +47,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
   @Expose
   private int[] resultColumnIds;
 
+  /** Aggregation Functions */
+  @Expose private AggregationFunctionCallEval [] aggrFunctions;
+
   public DistinctGroupbyNode(int pid) {
     super(pid, NodeType.DISTINCT_GROUP_BY);
   }
@@ -59,7 +67,11 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
 
   @Override
   public Target[] getTargets() {
-    return new Target[0];
+    if (hasTargets()) {
+      return targets;
+    } else {
+      return new Target[0];
+    }
   }
 
   public void setGroupbyNodes(List<GroupbyNode> groupByNodes) {
@@ -86,6 +98,18 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
     this.resultColumnIds = resultColumnIds;
   }
 
+  public AggregationFunctionCallEval [] getAggFunctions() {
+    return this.aggrFunctions;
+  }
+
+  public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+    this.aggrFunctions = evals;
+  }
+
+  public void setGroupbyPlan(GroupbyNode groupbyPlan) { this.groupbyPlan = groupbyPlan; }
+
+  public GroupbyNode getGroupbyPlan() { return this.groupbyPlan; }
+
   @Override
   public Object clone() throws CloneNotSupportedException {
     DistinctGroupbyNode cloneNode = (DistinctGroupbyNode)super.clone();
@@ -113,6 +137,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
       }
     }
 
+    if (groupbyPlan != null) {
+      cloneNode.groupbyPlan = (GroupbyNode)groupbyPlan.clone();
+    }
     return cloneNode;
   }
 
@@ -200,4 +227,27 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
 
     return planStr;
   }
+
+  public Column[] getFirstStageShuffleKeyColumns() {
+    List<Column> shuffleKeyColumns = new ArrayList<Column>();
+    shuffleKeyColumns.add(getOutSchema().getColumn(0));   //distinctseq column
+    if (groupingColumns != null) {
+      for (Column eachColumn: groupingColumns) {
+        if (!shuffleKeyColumns.contains(eachColumn)) {
+          shuffleKeyColumns.add(eachColumn);
+        }
+      }
+    }
+    for (GroupbyNode eachGroupbyNode: groupByNodes) {
+      if (eachGroupbyNode.getGroupingColumns() != null && eachGroupbyNode.getGroupingColumns().length > 0) {
+        for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
+          if (!shuffleKeyColumns.contains(eachColumn)) {
+            shuffleKeyColumns.add(eachColumn);
+          }
+        }
+      }
+    }
+
+    return shuffleKeyColumns.toArray(new Column[]{});
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
new file mode 100644
index 0000000..7201ed4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Int2Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * This class incremented each row to more rows by grouping columns. In addition, the operator must creates each row
+ * because of aggregation non-distinct columns.
+ *
+ * For example, there is a query as follows:
+ *  select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+ *         count(distinct l_partkey), sum(l_orderkey)
+ *  from lineitem
+ *  group by l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+ *
+ *  If you execute above query on tajo, FileScanner makes tuples after scanning raw data as follows:
+ *
+ *  -----------------------------------------------------------------------------
+ *  l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_orderkey, l_partkey
+ *  -----------------------------------------------------------------------------
+ *  1, N, O, 1996-03-13, 1, 1
+ *  2, N, O, 1996-04-12, 1, 1
+ *  1, N, O, 1997-01-28, 2, 2
+ *  1, R, F, 1994-02-02, 3, 2
+ *  2, R, F, 1993-11-09, 3, 3
+ *  
+ *  And then the scanner will push it as input data to this class. After then, this class will makes output data as
+ *  follows:
+ *
+ *  -------------------------------------------------------------------------------------------------------------------
+ *  NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct,
+ *  l_orderkey for distinct, l_orderkey for nondistinct
+ *  -------------------------------------------------------------------------------------------------------------------
+ *  0, 2, R, F, 1993-11-09, 3, NULL, 3
+ *  0, 2, N, O, 1996-04-12, 1, NULL, 1
+ *  0, 1, N, O, 1997-01-28, 2, NULL, 2
+ *  0, 1, R, F, 1994-02-02, 2, NULL, 3
+ *  0, 1, N, O, 1996-03-13, 1, NULL, 1
+ *  1, 2, R, F, 1993-11-09, NULL, 3, NULL
+ *  1, 2, N, O, 1996-04-12, NULL, 1, NULL
+ *  1, 1, N, O, 1997-01-28, NULL, 2, NULL
+ *  1, 1, R, F, 1994-02-02, NULL, 3, NULL
+ *  1, 1, N, O, 1996-03-13, NULL, 1, NULL
+ *
+ *  For reference, NodeSequence means GroupByNode sequence. In this case, there are two GroupByNode. And it consist
+ *  of lineitem.l_partkey and lineitem.l_orderkey. The NodeSequence of lineitem.l_partkey is zero and the sequence of
+ *  lineitem.l_orderkey is one. As above output data, If there are uncomfortable column for DistinctGroupBy, 
+ *  inner aggregator makes it to NullDataTum.
+ *  
+ *  In addition, columns for NonDistinctGroupBy only can contains real value at first NodeSequence.
+ *
+ */
+
+public class DistinctGroupbyFirstAggregationExec extends PhysicalExec {
+  private static Log LOG = LogFactory.getLog(DistinctGroupbyFirstAggregationExec.class);
+
+  private DistinctGroupbyNode plan;
+  private boolean finished = false;
+  private boolean preparedData = false;
+  private PhysicalExec child;
+
+  private long totalNumRows;
+  private int fetchedRows;
+  private float progress;
+
+  private int[] groupingKeyIndexes;
+  private NonDistinctHashAggregator nonDistinctHashAggregator;
+  private DistinctHashAggregator[] distinctAggregators;
+
+  private int resultTupleLength;
+
+  public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp)
+      throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+    this.child = subOp;
+    this.plan = plan;
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+    child.init();
+
+    // finding grouping column index
+    Column[] groupingColumns = plan.getGroupingColumns();
+    groupingKeyIndexes = new int[groupingColumns.length];
+
+    int index = 0;
+    for (Column col: groupingColumns) {
+      int keyIndex;
+      if (col.hasQualifier()) {
+        keyIndex = inSchema.getColumnId(col.getQualifiedName());
+      } else {
+        keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+      }
+      groupingKeyIndexes[index++] = keyIndex;
+    }
+    resultTupleLength = groupingKeyIndexes.length + 1;  //1 is Sequence Datum which indicates sequence of DistinctNode.
+
+    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+    List<DistinctHashAggregator> distinctAggrList = new ArrayList<DistinctHashAggregator>();
+    int distinctSeq = 0;
+    for (GroupbyNode eachGroupby: groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby);
+        aggregator.setNodeSequence(distinctSeq++);
+        distinctAggrList.add(aggregator);
+        resultTupleLength += aggregator.getTupleLength();
+      } else {
+        nonDistinctHashAggregator = new NonDistinctHashAggregator(eachGroupby);
+        resultTupleLength += nonDistinctHashAggregator.getTupleLength();
+      }
+    }
+    distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[]{});
+  }
+
+  private int currentAggregatorIndex = 0;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (!preparedData) {
+      prepareInputData();
+    }
+
+    int prevIndex = currentAggregatorIndex;
+    while (!context.isStopped()) {
+      DistinctHashAggregator aggregator = distinctAggregators[currentAggregatorIndex];
+      Tuple result = aggregator.next();
+      if (result != null) {
+        return result;
+      }
+      currentAggregatorIndex++;
+      currentAggregatorIndex = currentAggregatorIndex % distinctAggregators.length;
+      if (currentAggregatorIndex == prevIndex) {
+        finished = true;
+        return null;
+      }
+    }
+
+    return null;
+  }
+
+  private void prepareInputData() throws IOException {
+    Tuple tuple = null;
+    while(!context.isStopped() && (tuple = child.next()) != null) {
+      Tuple groupingKey = new VTuple(groupingKeyIndexes.length);
+      for (int i = 0; i < groupingKeyIndexes.length; i++) {
+        groupingKey.put(i, tuple.get(groupingKeyIndexes[i]));
+      }
+      for (int i = 0; i < distinctAggregators.length; i++) {
+        distinctAggregators[i].compute(groupingKey, tuple);
+      }
+      if (nonDistinctHashAggregator != null) {
+        nonDistinctHashAggregator.compute(groupingKey, tuple);
+      }
+    }
+    for (int i = 0; i < distinctAggregators.length; i++) {
+      distinctAggregators[i].rescan();
+    }
+
+    totalNumRows = distinctAggregators[0].distinctAggrDatas.size();
+    preparedData = true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    child.close();
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (child != null) {
+      return child.getInputStats();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    if (finished) {
+      return progress;
+    } else {
+      if (totalNumRows > 0) {
+        return progress + ((float)fetchedRows / (float)totalNumRows) * 0.5f;
+      } else {
+        return progress;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() {
+    finished = false;
+    currentAggregatorIndex = 0;
+    for (int i = 0; i < distinctAggregators.length; i++) {
+      distinctAggregators[i].rescan();
+    }
+  }
+
+  class NonDistinctHashAggregator {
+    private GroupbyNode groupbyNode;
+    private int aggFunctionsNum;
+    private final AggregationFunctionCallEval aggFunctions[];
+
+    // GroupingKey -> FunctionContext[]
+    private Map<Tuple, FunctionContext[]> nonDistinctAggrDatas;
+    private int tupleLength;
+
+    private Tuple dummyTuple;
+    private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException {
+      this.groupbyNode = groupbyNode;
+
+      nonDistinctAggrDatas = new HashMap<Tuple, FunctionContext[]>();
+
+      if (groupbyNode.hasAggFunctions()) {
+        aggFunctions = groupbyNode.getAggFunctions();
+        aggFunctionsNum = aggFunctions.length;
+        for (AggregationFunctionCallEval eachFunction: aggFunctions) {
+          eachFunction.setFirstPhase();
+        }
+      } else {
+        aggFunctions = new AggregationFunctionCallEval[0];
+        aggFunctionsNum = 0;
+      }
+
+      dummyTuple = new VTuple(aggFunctionsNum);
+      for (int i = 0; i < aggFunctionsNum; i++) {
+        dummyTuple.put(i, NullDatum.get());
+      }
+      tupleLength = aggFunctionsNum;
+    }
+
+    public void compute(Tuple groupingKeyTuple, Tuple tuple) {
+      FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple);
+      if (contexts != null) {
+        for (int i = 0; i < aggFunctions.length; i++) {
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+      } else { // if the key occurs firstly
+        contexts = new FunctionContext[aggFunctionsNum];
+        for (int i = 0; i < aggFunctionsNum; i++) {
+          contexts[i] = aggFunctions[i].newContext();
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+        nonDistinctAggrDatas.put(groupingKeyTuple, contexts);
+      }
+    }
+
+    public Tuple aggregate(Tuple groupingKey) {
+      FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKey);
+      if (contexts == null) {
+        return null;
+      }
+      Tuple tuple = new VTuple(aggFunctionsNum);
+
+      for (int i = 0; i < aggFunctionsNum; i++) {
+        tuple.put(i, aggFunctions[i].terminate(contexts[i]));
+      }
+
+      return tuple;
+    }
+
+    public int getTupleLength() {
+      return tupleLength;
+    }
+
+    public Tuple getDummyTuple() {
+      return dummyTuple;
+    }
+  }
+
+  class DistinctHashAggregator {
+    private GroupbyNode groupbyNode;
+
+    // GroupingKey -> DistinctKey
+    private Map<Tuple, Set<Tuple>> distinctAggrDatas;
+    private Iterator<Entry<Tuple, Set<Tuple>>> iterator = null;
+
+    private int nodeSequence;
+    private Int2Datum nodeSequenceDatum;
+
+    private int[] distinctKeyIndexes;
+
+    private int tupleLength;
+    private Tuple dummyTuple;
+    private boolean aggregatorFinished = false;
+
+    public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException {
+      this.groupbyNode = groupbyNode;
+
+      Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();
+      for (Integer eachIndex: groupingKeyIndexes) {
+        groupingKeyIndexSet.add(eachIndex);
+      }
+
+      List<Integer> distinctGroupingKeyIndexSet = new ArrayList<Integer>();
+      Column[] groupingColumns = groupbyNode.getGroupingColumns();
+      for (int idx = 0; idx < groupingColumns.length; idx++) {
+        Column col = groupingColumns[idx];
+        int keyIndex;
+        if (col.hasQualifier()) {
+          keyIndex = inSchema.getColumnId(col.getQualifiedName());
+        } else {
+          keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+        }
+        if (!groupingKeyIndexSet.contains(keyIndex)) {
+          distinctGroupingKeyIndexSet.add(keyIndex);
+        }
+      }
+      int index = 0;
+      this.distinctKeyIndexes = new int[distinctGroupingKeyIndexSet.size()];
+      this.dummyTuple = new VTuple(distinctGroupingKeyIndexSet.size());
+      for (Integer eachId : distinctGroupingKeyIndexSet) {
+        this.dummyTuple.put(index, NullDatum.get());
+        this.distinctKeyIndexes[index++] = eachId;
+      }
+
+      this.distinctAggrDatas = new HashMap<Tuple, Set<Tuple>>();
+      this.tupleLength = distinctKeyIndexes.length;
+    }
+
+    public void setNodeSequence(int nodeSequence) {
+      this.nodeSequence = nodeSequence;
+      this.nodeSequenceDatum = new Int2Datum((short)nodeSequence);
+    }
+
+    public int getTupleLength() {
+      return tupleLength;
+    }
+
+    public void compute(Tuple groupingKey, Tuple tuple) throws IOException {
+      Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length);
+      for (int i = 0; i < distinctKeyIndexes.length; i++) {
+        distinctKeyTuple.put(i, tuple.get(distinctKeyIndexes[i]));
+      }
+
+      Set<Tuple> distinctEntry = distinctAggrDatas.get(groupingKey);
+      if (distinctEntry == null) {
+        distinctEntry = new HashSet<Tuple>();
+        distinctAggrDatas.put(groupingKey, distinctEntry);
+      }
+      distinctEntry.add(distinctKeyTuple);
+    }
+
+    public void rescan() {
+      iterator = distinctAggrDatas.entrySet().iterator();
+      currentGroupingTuples = null;
+      groupingKeyChanged = false;
+      aggregatorFinished = false;
+    }
+
+    public void close() throws IOException {
+      distinctAggrDatas.clear();
+      distinctAggrDatas = null;
+      currentGroupingTuples = null;
+      iterator = null;
+    }
+
+    Entry<Tuple, Set<Tuple>> currentGroupingTuples;
+    Iterator<Tuple> distinctKeyIterator;
+    boolean groupingKeyChanged = false;
+
+    public Tuple next() {
+      if (aggregatorFinished) {
+        return null;
+      }
+      if (currentGroupingTuples == null) {
+        // first
+        if (!iterator.hasNext()) {
+          // Empty case
+          aggregatorFinished = true;
+          return null;
+        }
+        currentGroupingTuples = iterator.next();
+        groupingKeyChanged = true;
+        distinctKeyIterator = currentGroupingTuples.getValue().iterator();
+      }
+      if (!distinctKeyIterator.hasNext()) {
+        if (!iterator.hasNext()) {
+          aggregatorFinished = true;
+          return null;
+        }
+        currentGroupingTuples = iterator.next();
+        groupingKeyChanged = true;
+        distinctKeyIterator = currentGroupingTuples.getValue().iterator();
+      }
+      // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ...
+      // If n'st == this.nodeSequence set with real data, otherwise set with NullDatum
+      Tuple tuple = new VTuple(resultTupleLength);
+      int tupleIndex = 0;
+      tuple.put(tupleIndex++, nodeSequenceDatum);
+
+      // merge grouping key
+      Tuple groupingKeyTuple = currentGroupingTuples.getKey();
+      int groupingKeyLength = groupingKeyTuple.size();
+      for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) {
+        tuple.put(tupleIndex, groupingKeyTuple.get(i));
+      }
+
+      // merge distinctKey
+      for (int i = 0; i < distinctAggregators.length; i++) {
+        if (i == nodeSequence) {
+          Tuple distinctKeyTuple = distinctKeyIterator.next();
+          int distinctKeyLength = distinctKeyTuple.size();
+          for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) {
+            tuple.put(tupleIndex, distinctKeyTuple.get(j));
+          }
+        } else {
+          Tuple dummyTuple = distinctAggregators[i].getDummyTuple();
+          int dummyTupleSize = dummyTuple.size();
+          for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) {
+            tuple.put(tupleIndex, dummyTuple.get(j));
+          }
+        }
+      }
+
+      // merge non distinct aggregation tuple
+      if (nonDistinctHashAggregator != null) {
+        Tuple nonDistinctTuple;
+        if (nodeSequence == 0 && groupingKeyChanged) {
+          groupingKeyChanged = false;
+          nonDistinctTuple = nonDistinctHashAggregator.aggregate(groupingKeyTuple);
+          if (nonDistinctTuple == null) {
+            nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple();
+          }
+        } else {
+          nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple();
+        }
+        int tupleSize = nonDistinctTuple.size();
+        for (int j = 0; j < tupleSize; j++, tupleIndex++) {
+          tuple.put(tupleIndex, nonDistinctTuple.get(j));
+        }
+      }
+      return tuple;
+    }
+
+    public Tuple getDummyTuple() {
+      return dummyTuple;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
new file mode 100644
index 0000000..bc8885f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
@@ -0,0 +1,295 @@
+  /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class adjusts shuffle columns between DistinctGroupbyFirstAggregationExec and
+ * DistinctGroupbyThirdAggregationExec. It shuffled by grouping columns and aggregation columns. Because of the
+ * shuffle, more DistinctGroupbyThirdAggregationExec will execute compare than previous two distinct group by
+ * algorithm. And then, many DistinctGroupbyThirdAggregationExec improve the performance of count distinct query.
+ *
+ * For example, there is a query as follows:
+ *  select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+ *         count(distinct l_partkey), sum(l_orderkey)
+ *  from lineitem
+ *  group by l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+ *
+ *  In this case, execution plan for this operator will set shuffle type as follows:
+ *    Incoming: 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), default.lineitem.l_linenumber (INT4),
+ *      default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), default.lineitem.l_shipdate (TEXT),
+ *     default.lineitem.l_partkey (INT4), default.lineitem.l_orderkey (INT4), num=32)
+ *
+ *    Outgoing: 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber (INT4),
+ *      default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT),
+ *      default.lineitem.l_shipdate (TEXT), num=32)
+ *
+ *  For reference, input data and output data results as follows:
+ *
+ *  -------------------------------------------------------------------------------------------------------------------
+ *  NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct,
+ *  l_orderkey for distinct, l_orderkey for nondistinct
+ *  -------------------------------------------------------------------------------------------------------------------
+ *  0, 2, R, F, 1993-11-09, 3, NULL, 3
+ *  0, 2, N, O, 1996-04-12, 1, NULL, 1
+ *  0, 1, N, O, 1997-01-28, 2, NULL, 2
+ *  0, 1, R, F, 1994-02-02, 2, NULL, 3
+ *  0, 1, N, O, 1996-03-13, 1, NULL, 1
+ *  1, 2, R, F, 1993-11-09, NULL, 3, NULL
+ *  1, 2, N, O, 1996-04-12, NULL, 1, NULL
+ *  1, 1, N, O, 1997-01-28, NULL, 2, NULL
+ *  1, 1, R, F, 1994-02-02, NULL, 3, NULL
+ *  1, 1, N, O, 1996-03-13, NULL, 1, NULL
+ *
+ */
+public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
+  private static Log LOG = LogFactory.getLog(DistinctGroupbySecondAggregationExec.class);
+  private DistinctGroupbyNode plan;
+  private PhysicalExec child;
+
+  private boolean finished = false;
+
+  private int numGroupingColumns;
+  private int[][] distinctKeyIndexes;
+  private FunctionContext[] nonDistinctAggrContexts;
+  private AggregationFunctionCallEval[] nonDistinctAggrFunctions;
+  private int nonDistinctAggrTupleStartIndex = -1;
+
+  public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
+      throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
+    this.plan = plan;
+    this.child = sortExec;
+  }
+
+  @Override
+  public void init() throws IOException {
+    this.child.init();
+
+    numGroupingColumns = plan.getGroupingColumns().length;
+
+    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+    // Finding distinct group by column index.
+    Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();
+    for (Column col: plan.getGroupingColumns()) {
+      int keyIndex;
+      if (col.hasQualifier()) {
+        keyIndex = inSchema.getColumnId(col.getQualifiedName());
+      } else {
+        keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+      }
+      groupingKeyIndexSet.add(keyIndex);
+    }
+
+    int numDistinct = 0;
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        numDistinct++;
+      } else {
+        nonDistinctAggrFunctions = eachGroupby.getAggFunctions();
+        if (nonDistinctAggrFunctions != null) {
+          for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) {
+            eachFunction.setIntermediatePhase();
+          }
+          nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length];
+        }
+      }
+    }
+
+    int index = 0;
+    distinctKeyIndexes = new int[numDistinct][];
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        List<Integer> distinctGroupingKeyIndex = new ArrayList<Integer>();
+        Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns();
+        for (int idx = 0; idx < distinctGroupingColumns.length; idx++) {
+          Column col = distinctGroupingColumns[idx];
+          int keyIndex;
+          if (col.hasQualifier()) {
+            keyIndex = inSchema.getColumnId(col.getQualifiedName());
+          } else {
+            keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+          }
+          if (!groupingKeyIndexSet.contains(keyIndex)) {
+            distinctGroupingKeyIndex.add(keyIndex);
+          }
+        }
+        int i = 0;
+        distinctKeyIndexes[index] = new int[distinctGroupingKeyIndex.size()];
+        for (int eachIdx : distinctGroupingKeyIndex) {
+          distinctKeyIndexes[index][i++] = eachIdx;
+        }
+        index++;
+      }
+    }
+    if (nonDistinctAggrFunctions != null) {
+      nonDistinctAggrTupleStartIndex = inSchema.size() - nonDistinctAggrFunctions.length;
+    }
+  }
+
+  Tuple prevKeyTuple = null;
+  Tuple prevTuple = null;
+  int prevSeq = -1;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (finished) {
+      return null;
+    }
+
+    Tuple result = null;
+    while (!context.isStopped()) {
+      Tuple childTuple = child.next();
+      if (childTuple == null) {
+        finished = true;
+
+        if (prevTuple == null) {
+          // Empty case
+          return null;
+        }
+        if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
+          terminatedNonDistinctAggr(prevTuple);
+        }
+        result = prevTuple;
+        break;
+      }
+
+      Tuple tuple = null;
+      try {
+        tuple = childTuple.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+
+      int distinctSeq = tuple.get(0).asInt2();
+      Tuple keyTuple = getKeyTuple(distinctSeq, tuple);
+
+      if (prevKeyTuple == null) {
+        // First
+        if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+          initNonDistinctAggrContext();
+          mergeNonDistinctAggr(tuple);
+        }
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        prevSeq = distinctSeq;
+        continue;
+      }
+
+      if (!prevKeyTuple.equals(keyTuple)) {
+        // new grouping key
+        if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
+          terminatedNonDistinctAggr(prevTuple);
+        }
+        result = prevTuple;
+
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        prevSeq = distinctSeq;
+
+        if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+          initNonDistinctAggrContext();
+          mergeNonDistinctAggr(tuple);
+        }
+        break;
+      } else {
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        prevSeq = distinctSeq;
+        if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+          mergeNonDistinctAggr(tuple);
+        }
+      }
+    }
+
+    return result;
+  }
+
+  private void initNonDistinctAggrContext() {
+    if (nonDistinctAggrFunctions != null) {
+      nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length];
+      for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+        nonDistinctAggrContexts[i] = nonDistinctAggrFunctions[i].newContext();
+      }
+    }
+  }
+
+  private void mergeNonDistinctAggr(Tuple tuple) {
+    if (nonDistinctAggrFunctions == null) {
+      return;
+    }
+    for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+      nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], inSchema, tuple);
+    }
+  }
+
+  private void terminatedNonDistinctAggr(Tuple tuple) {
+    if (nonDistinctAggrFunctions == null) {
+      return;
+    }
+    for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+      tuple.put(nonDistinctAggrTupleStartIndex + i, nonDistinctAggrFunctions[i].terminate(nonDistinctAggrContexts[i]));
+    }
+  }
+
+  private Tuple getKeyTuple(int distinctSeq, Tuple tuple) {
+    int[] columnIndexes = distinctKeyIndexes[distinctSeq];
+
+    Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1);
+    keyTuple.put(0, tuple.get(0));
+    for (int i = 0; i < numGroupingColumns; i++) {
+      keyTuple.put(i + 1, tuple.get(i + 1));
+    }
+    for (int i = 0; i < columnIndexes.length; i++) {
+      keyTuple.put(i + 1 + numGroupingColumns, tuple.get(columnIndexes[i]));
+    }
+
+    return keyTuple;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    prevKeyTuple = null;
+    prevTuple = null;
+    finished = false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
new file mode 100644
index 0000000..239dabf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ *  This class aggregates the output of DistinctGroupbySecondAggregationExec.
+ *
+ */
+public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
+  private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class);
+  private DistinctGroupbyNode plan;
+  private PhysicalExec child;
+
+  private boolean finished = false;
+
+  private DistinctFinalAggregator[] aggregators;
+  private DistinctFinalAggregator nonDistinctAggr;
+
+  private int resultTupleLength;
+  private int numGroupingColumns;
+
+  private int[] resultTupleIndexes;
+
+  public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
+      throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
+    this.plan = plan;
+    this.child = sortExec;
+  }
+
+  @Override
+  public void init() throws IOException {
+    this.child.init();
+
+    numGroupingColumns = plan.getGroupingColumns().length;
+    resultTupleLength = numGroupingColumns;
+
+    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+    List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>();
+    int inTupleIndex = 1 + numGroupingColumns;
+    int outTupleIndex = numGroupingColumns;
+    int distinctSeq = 0;
+
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby));
+        distinctSeq++;
+
+        Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns();
+        inTupleIndex += distinctGroupingColumns.length;
+        outTupleIndex += eachGroupby.getAggFunctions().length;
+      } else {
+        nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby);
+        outTupleIndex += eachGroupby.getAggFunctions().length;
+      }
+      resultTupleLength += eachGroupby.getAggFunctions().length;
+    }
+    aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{});
+
+    // make output schema mapping index
+    resultTupleIndexes = new int[outSchema.size()];
+    Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, Integer>();
+    int resultTupleIndex = 0;
+    for (Column eachColumn: plan.getGroupingColumns()) {
+      groupbyResultTupleIndex.put(eachColumn, resultTupleIndex);
+      resultTupleIndex++;
+    }
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      Set<Column> groupingColumnSet = new HashSet<Column>();
+      for (Column column: eachGroupby.getGroupingColumns()) {
+        groupingColumnSet.add(column);
+      }
+      for (Target eachTarget: eachGroupby.getTargets()) {
+        if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) {
+          //aggr function
+          groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex);
+          resultTupleIndex++;
+        }
+      }
+    }
+
+    int index = 0;
+    for (Column eachOutputColumn: outSchema.getColumns()) {
+      // If column is avg aggregation function, outschema's column type is float
+      // but groupbyResultTupleIndex's column type is protobuf
+
+      int matchedIndex = -1;
+      for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) {
+        if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName())) {
+          matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn);
+          break;
+        }
+      }
+      if (matchedIndex < 0) {
+        throw new IOException("Can't find proper output column mapping: " + eachOutputColumn);
+      }
+      resultTupleIndexes[matchedIndex] = index++;
+    }
+  }
+
+  Tuple prevKeyTuple = null;
+  Tuple prevTuple = null;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (finished) {
+      return null;
+    }
+
+    Tuple resultTuple = new VTuple(resultTupleLength);
+
+    while (!context.isStopped()) {
+      Tuple childTuple = child.next();
+      // Last tuple
+      if (childTuple == null) {
+        finished = true;
+
+        if (prevTuple == null) {
+          // Empty case
+          if (numGroupingColumns == 0) {
+            // No grouping column, return null tuple
+            return makeEmptyTuple();
+          } else {
+            return null;
+          }
+        }
+
+        for (int i = 0; i < numGroupingColumns; i++) {
+          resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+        }
+        for (DistinctFinalAggregator eachAggr: aggregators) {
+          eachAggr.terminate(resultTuple);
+        }
+        break;
+      }
+
+      Tuple tuple = null;
+      try {
+        tuple = childTuple.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+
+      int distinctSeq = tuple.get(0).asInt2();
+      Tuple keyTuple = getGroupingKeyTuple(tuple);
+
+      // First tuple
+      if (prevKeyTuple == null) {
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+
+        aggregators[distinctSeq].merge(tuple);
+        continue;
+      }
+
+      if (!prevKeyTuple.equals(keyTuple)) {
+        // new grouping key
+        for (int i = 0; i < numGroupingColumns; i++) {
+          resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+        }
+        for (DistinctFinalAggregator eachAggr: aggregators) {
+          eachAggr.terminate(resultTuple);
+        }
+
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+
+        aggregators[distinctSeq].merge(tuple);
+        break;
+      } else {
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        aggregators[distinctSeq].merge(tuple);
+      }
+    }
+
+    return resultTuple;
+  }
+
+  private Tuple makeEmptyTuple() {
+    Tuple resultTuple = new VTuple(resultTupleLength);
+    for (DistinctFinalAggregator eachAggr: aggregators) {
+      eachAggr.terminateEmpty(resultTuple);
+    }
+
+    return resultTuple;
+  }
+
+  private Tuple getGroupingKeyTuple(Tuple tuple) {
+    Tuple keyTuple = new VTuple(numGroupingColumns);
+    for (int i = 0; i < numGroupingColumns; i++) {
+      keyTuple.put(i, tuple.get(i + 1));
+    }
+
+    return keyTuple;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    prevKeyTuple = null;
+    prevTuple = null;
+    finished = false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  class DistinctFinalAggregator {
+    private FunctionContext[] functionContexts;
+    private AggregationFunctionCallEval[] aggrFunctions;
+    private int seq;
+    private int inTupleIndex;
+    private int outTupleIndex;
+    public DistinctFinalAggregator(int seq, int inTupleIndex, int outTupleIndex, GroupbyNode groupbyNode) {
+      this.seq = seq;
+      this.inTupleIndex = inTupleIndex;
+      this.outTupleIndex = outTupleIndex;
+
+      aggrFunctions = groupbyNode.getAggFunctions();
+      if (aggrFunctions != null) {
+        for (AggregationFunctionCallEval eachFunction: aggrFunctions) {
+          eachFunction.setFinalPhase();
+        }
+      }
+      newFunctionContext();
+    }
+
+    private void newFunctionContext() {
+      functionContexts = new FunctionContext[aggrFunctions.length];
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        functionContexts[i] = aggrFunctions[i].newContext();
+      }
+    }
+
+    public void merge(Tuple tuple) {
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        aggrFunctions[i].merge(functionContexts[i], inSchema, tuple);
+      }
+
+      if (seq == 0 && nonDistinctAggr != null) {
+        if (!tuple.get(nonDistinctAggr.inTupleIndex).isNull()) {
+          nonDistinctAggr.merge(tuple);
+        }
+      }
+    }
+
+    public void terminate(Tuple resultTuple) {
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i]));
+      }
+      newFunctionContext();
+
+      if (seq == 0 && nonDistinctAggr != null) {
+        nonDistinctAggr.terminate(resultTuple);
+      }
+    }
+
+    public void terminateEmpty(Tuple resultTuple) {
+      newFunctionContext();
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i]));
+      }
+      if (seq == 0 && nonDistinctAggr != null) {
+        nonDistinctAggr.terminateEmpty(resultTuple);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 4deddee..598054c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -33,10 +33,8 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
-import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -45,6 +43,8 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.master.TaskSchedulerContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
@@ -799,13 +799,30 @@ public class Repartitioner {
     }
 
     int groupingColumns = 0;
-    GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
-    if (groupby != null) {
-      groupingColumns = groupby.getGroupingColumns().length;
-    } else {
-      DistinctGroupbyNode dGroupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
-      if (dGroupby != null) {
-        groupingColumns = dGroupby.getGroupingColumns().length;
+    LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(),
+        new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY});
+    if (groupbyNodes != null && groupbyNodes.length > 0) {
+      LogicalNode bottomNode = groupbyNodes[0];
+      if (bottomNode.getType() == NodeType.GROUP_BY) {
+        groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length;
+      } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+        DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+        if (distinctNode == null) {
+          LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
+          distinctNode = (DistinctGroupbyNode)bottomNode;
+        }
+        groupingColumns = distinctNode.getGroupingColumns().length;
+
+        Enforcer enforcer = execBlock.getEnforcer();
+        EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
+        if (property != null) {
+          if (property.getDistinct().getIsMultipleAggregation()) {
+            MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
+            if (stage != MultipleAggregationStage.THRID_STAGE) {
+              groupingColumns = distinctNode.getOutSchema().size();
+            }
+          }
+        }
       }
     }
     // get a proper number of tasks
@@ -1145,7 +1162,8 @@ public class Repartitioner {
 
     // set the partition number for group by and sort
     if (channel.getShuffleType() == HASH_SHUFFLE) {
-      if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
+      if (execBlock.getPlan().getType() == NodeType.GROUP_BY ||
+          execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) {
         keys = channel.getShuffleKeys();
       }
     } else if (channel.getShuffleType() == RANGE_SHUFFLE) {