You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2014/10/08 04:37:12 UTC
[1/2] TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun
Kim and jaehwa)
Repository: tajo
Updated Branches:
refs/heads/master de28c8294 -> 0dfa3972c
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 919ac9b..1f348ff 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -43,13 +43,18 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.plan.proto.PlanProto;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
import org.apache.tajo.master.*;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -810,9 +815,30 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
if (grpNode.getType() == NodeType.GROUP_BY) {
hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
} else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
- hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0;
+ // Find current distinct stage node.
+ DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+ if (distinctNode == null) {
+ LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
+ distinctNode = (DistinctGroupbyNode)grpNode;
+ }
+ hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
+
+ Enforcer enforcer = subQuery.getBlock().getEnforcer();
+ if (enforcer == null) {
+ LOG.warn(subQuery.getId() + ", DistinctGroupbyNode's enforcer is null.");
+ }
+ EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
+ if (property != null) {
+ if (property.getDistinct().getIsMultipleAggregation()) {
+ MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
+ if (stage != MultipleAggregationStage.THRID_STAGE) {
+ hasGroupColumns = true;
+ }
+ }
+ }
}
if (!hasGroupColumns) {
+ LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
return 1;
} else {
long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index bde2459..2760301 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -315,6 +315,12 @@ message DistinctGroupbyEnforcer {
SORT_AGGREGATION = 1;
}
+ enum MultipleAggregationStage {
+ FIRST_STAGE = 0;
+ SECOND_STAGE = 1;
+ THRID_STAGE = 3;
+ }
+
message SortSpecArray {
required int32 pid = 1;
repeated SortSpecProto sortSpecs = 2;
@@ -322,6 +328,8 @@ message DistinctGroupbyEnforcer {
required int32 pid = 1;
required DistinctAggregationAlgorithm algorithm = 2;
repeated SortSpecArray sortSpecArrays = 3;
+ required bool isMultipleAggregation = 4 [default = false];
+ optional MultipleAggregationStage multipleAggregationStage = 5;
}
message EnforcerProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index fccec26..8b9f9f7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -20,10 +20,7 @@ package org.apache.tajo.engine.query;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -34,9 +31,14 @@ import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TajoWorker;
+import org.junit.AfterClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.sql.ResultSet;
import java.util.*;
@@ -44,11 +46,33 @@ import java.util.*;
import static org.junit.Assert.*;
@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
public class TestGroupByQuery extends QueryTestCaseBase {
private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class);
- public TestGroupByQuery() throws Exception {
+ public TestGroupByQuery(String groupByOption) throws Exception {
super(TajoConstants.DEFAULT_DATABASE_NAME);
+
+ Map<String, String> variables = new HashMap<String, String>();
+ if (groupByOption.equals("MultiLevel")) {
+ variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "true");
+ } else {
+ variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "false");
+ }
+ client.updateSessionVariables(variables);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ client.unsetSessionVariables(TUtil.newList(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname()));
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {"MultiLevel"},
+ {"No-MultiLevel"},
+ });
}
@Test
@@ -285,6 +309,24 @@ public class TestGroupByQuery extends QueryTestCaseBase {
}
@Test
+ public final void testDistinctAggregation8() throws Exception {
+ /*
+ select
+ sum(distinct l_orderkey),
+ l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+ count(distinct l_partkey),
+ sum(l_orderkey)
+ from
+ lineitem
+ group by
+ l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+ */
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+
+ @Test
public final void testDistinctAggregationWithHaving1() throws Exception {
// select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey) from lineitem
// group by l_linenumber having sum(distinct l_orderkey) >= 6;
@@ -343,6 +385,14 @@ public class TestGroupByQuery extends QueryTestCaseBase {
assertResultSet(res, "testDistinctAggregation_case8.result");
res.close();
+ res = executeFile("testDistinctAggregation_case9.sql");
+ assertResultSet(res, "testDistinctAggregation_case9.result");
+ res.close();
+
+ res = executeFile("testDistinctAggregation_case10.sql");
+ assertResultSet(res, "testDistinctAggregation_case10.result");
+ res.close();
+
// case9
KeyValueSet tableOptions = new KeyValueSet();
tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
new file mode 100644
index 0000000..0553d06
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
@@ -0,0 +1,9 @@
+select
+ sum(distinct l_orderkey),
+ l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+ count(distinct l_partkey),
+ sum(l_orderkey)
+from
+ lineitem
+group by
+ l_linenumber, l_returnflag, l_linestatus, l_shipdate;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
new file mode 100644
index 0000000..6ab7c25
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
@@ -0,0 +1,5 @@
+select sum(cnt1), sum(sum2)
+from (
+ select o_orderdate, count(distinct o_orderpriority), count(distinct o_orderkey) cnt1, sum(o_totalprice) sum2
+ from orders group by o_orderdate
+) a
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
new file mode 100644
index 0000000..6265599
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
@@ -0,0 +1,11 @@
+select
+ lineitem.l_orderkey as l_orderkey,
+ count(distinct lineitem.l_partkey) as cnt1,
+ sum(lineitem.l_quantity + lineitem.l_linenumber)/count(distinct lineitem.l_suppkey) as value2,
+ lineitem.l_partkey as l_partkey,
+ avg(lineitem.l_quantity) as avg1,
+ count(distinct lineitem.l_suppkey) as cnt2
+from
+ lineitem
+group by
+ lineitem.l_orderkey, lineitem.l_partkey
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
new file mode 100644
index 0000000..519390d
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
@@ -0,0 +1,7 @@
+?sum,l_linenumber,l_returnflag,l_linestatus,l_shipdate,?count_1,?sum_2
+-------------------------------
+1,1,N,O,1996-03-13,1,1
+2,1,N,O,1997-01-28,1,2
+3,1,R,F,1994-02-02,1,3
+1,2,N,O,1996-04-12,1,1
+3,2,R,F,1993-11-09,1,3
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
new file mode 100644
index 0000000..2035d9f
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
@@ -0,0 +1,3 @@
+?sum,?sum_1
+-------------------------------
+3,414440.9
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
new file mode 100644
index 0000000..506eea0
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
@@ -0,0 +1,6 @@
+l_orderkey,cnt1,value2,l_partkey,avg1,cnt2
+-------------------------------
+1,1,28.0,1,26.5,2
+2,1,39.0,2,38.0,1
+3,1,46.0,2,45.0,1
+3,1,51.0,3,49.0,1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index e6b12b1..25f1ae7 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -25,6 +25,7 @@ Available Session Variables:
\set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb)
\set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb)
\set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb)
+\set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled
\set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
\set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
\set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index 51388a4..084c105 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -42,10 +42,6 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
@SuppressWarnings("unused")
private final boolean[] nullFirsts;
- private Datum left;
- private Datum right;
- private int compVal;
-
/**
* @param schema The schema of input tuples
* @param sortKeys The description of sort keys
@@ -88,6 +84,10 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
@Override
public int compare(Tuple tuple1, Tuple tuple2) {
+ Datum left = null;
+ Datum right = null;
+ int compVal = 0;
+
for (int i = 0; i < sortKeyIds.length; i++) {
left = tuple1.get(sortKeyIds[i]);
right = tuple2.get(sortKeyIds[i]);
[2/2] git commit: TAJO-1010: Improve multiple DISTINCT aggregation.
(Hyoungjun Kim and jaehwa)
Posted by bl...@apache.org.
TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)
Closes #136
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0dfa3972
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0dfa3972
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0dfa3972
Branch: refs/heads/master
Commit: 0dfa3972c6a52d785b8e55f91d0906456a3926b3
Parents: de28c82
Author: Jaehwa Jung <bl...@apache.org>
Authored: Wed Oct 8 11:35:45 2014 +0900
Committer: Jaehwa Jung <bl...@apache.org>
Committed: Wed Oct 8 11:35:45 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../main/java/org/apache/tajo/SessionVars.java | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 2 +
.../eval/AggregationFunctionCallEval.java | 38 +-
.../engine/planner/PhysicalPlannerImpl.java | 56 ++-
.../tajo/engine/planner/enforce/Enforcer.java | 13 +
.../engine/planner/global/GlobalPlanner.java | 58 +--
.../global/builder/DistinctGroupbyBuilder.java | 329 ++++++++++++-
.../planner/logical/DistinctGroupbyNode.java | 52 +-
.../DistinctGroupbyFirstAggregationExec.java | 476 +++++++++++++++++++
.../DistinctGroupbySecondAggregationExec.java | 295 ++++++++++++
.../DistinctGroupbyThirdAggregationExec.java | 304 ++++++++++++
.../tajo/master/querymaster/Repartitioner.java | 42 +-
.../tajo/master/querymaster/SubQuery.java | 28 +-
.../src/main/proto/TajoWorkerProtocol.proto | 8 +
.../tajo/engine/query/TestGroupByQuery.java | 60 ++-
.../testDistinctAggregation8.sql | 9 +
.../testDistinctAggregation_case10.sql | 5 +
.../testDistinctAggregation_case9.sql | 11 +
.../testDistinctAggregation8.result | 7 +
.../testDistinctAggregation_case10.result | 3 +
.../testDistinctAggregation_case9.result | 6 +
.../TestTajoCli/testHelpSessionVars.result | 1 +
.../apache/tajo/storage/TupleComparator.java | 8 +-
24 files changed, 1735 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index bff9583..7aa7a0c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)
+
TAJO-1093: DateTimeFormat.to_char() is slower than SimpleDateFormat.format().
(Jihun Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index cc875b2..1229849 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -98,6 +98,8 @@ public enum SessionVars implements ConfigKey {
TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME,
"shuffle output size for partition table write (mb)", DEFAULT),
+ GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT),
+
// for physical Executors
EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT),
HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT),
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index f9f5e4a..66d3030 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -316,6 +316,8 @@ public class TajoConf extends Configuration {
$DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256),
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256),
+ $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true),
+
// for physical Executors
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
index ab18aa9..3216519 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
@@ -30,7 +30,10 @@ import org.apache.tajo.storage.VTuple;
public class AggregationFunctionCallEval extends FunctionEval implements Cloneable {
@Expose protected AggFunction instance;
- @Expose boolean firstPhase = false;
+ @Expose boolean intermediatePhase = false;
+ @Expose boolean finalPhase = true;
+ @Expose String alias;
+
private Tuple params;
protected AggregationFunctionCallEval(EvalType type, FunctionDesc desc, AggFunction instance, EvalNode[] givenArgs) {
@@ -58,7 +61,8 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
}
}
- if (firstPhase) {
+ if (!intermediatePhase && !finalPhase) {
+ // firstPhase
instance.eval(context, params);
} else {
instance.merge(context, params);
@@ -71,7 +75,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
}
public Datum terminate(FunctionContext context) {
- if (firstPhase) {
+ if (!finalPhase) {
return instance.getPartialResult(context);
} else {
return instance.terminate(context);
@@ -80,18 +84,40 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
@Override
public DataType getValueType() {
- if (firstPhase) {
+ if (!finalPhase) {
return instance.getPartialResultType();
} else {
return funcDesc.getReturnType();
}
}
+ public void setAlias(String alias) { this.alias = alias; }
+
+ public String getAlias() { return this.alias; }
+
public Object clone() throws CloneNotSupportedException {
- return super.clone();
+ AggregationFunctionCallEval clone = (AggregationFunctionCallEval)super.clone();
+
+ clone.finalPhase = finalPhase;
+ clone.intermediatePhase = intermediatePhase;
+ clone.alias = alias;
+ clone.instance = (AggFunction)instance.clone();
+
+ return clone;
}
public void setFirstPhase() {
- this.firstPhase = true;
+ this.finalPhase = false;
+ this.intermediatePhase = false;
+ }
+
+ public void setFinalPhase() {
+ this.finalPhase = true;
+ this.intermediatePhase = false;
+ }
+
+ public void setIntermediatePhase() {
+ this.finalPhase = false;
+ this.intermediatePhase = true;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2730202..6b1c65c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -43,6 +43,7 @@ import org.apache.tajo.exception.InternalException;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageConstants;
@@ -56,6 +57,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
@@ -1047,17 +1049,61 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode);
if (property != null) {
- DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
- if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
- return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+ if (property.getDistinct().getIsMultipleAggregation()) {
+ MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
+
+ if (stage == MultipleAggregationStage.FIRST_STAGE) {
+ return new DistinctGroupbyFirstAggregationExec(context, distinctNode, subOp);
+ } else if (stage == MultipleAggregationStage.SECOND_STAGE) {
+ return new DistinctGroupbySecondAggregationExec(context, distinctNode,
+ createSortExecForDistinctGroupby(context, distinctNode, subOp, 2));
+ } else {
+ return new DistinctGroupbyThirdAggregationExec(context, distinctNode,
+ createSortExecForDistinctGroupby(context, distinctNode, subOp, 3));
+ }
} else {
- return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
+ DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
+ if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
+ return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+ } else {
+ return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
+ }
}
} else {
return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
}
}
+ private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context,
+ DistinctGroupbyNode distinctNode,
+ PhysicalExec subOp,
+ int phase) throws IOException {
+ SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+ //2 phase: seq, groupby columns, distinct1 keys, distinct2 keys,
+ //3 phase: groupby columns, seq, distinct1 keys, distinct2 keys,
+ List<SortSpec> sortSpecs = new ArrayList<SortSpec>();
+ if (phase == 2) {
+ sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
+ }
+ for (Column eachColumn: distinctNode.getGroupingColumns()) {
+ sortSpecs.add(new SortSpec(eachColumn));
+ }
+ if (phase == 3) {
+ sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
+ }
+ for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) {
+ for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
+ sortSpecs.add(new SortSpec(eachColumn));
+ }
+ }
+ sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[]{}));
+ sortNode.setInSchema(distinctNode.getInSchema());
+ sortNode.setOutSchema(distinctNode.getInSchema());
+ ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, subOp);
+
+ return sortExec;
+ }
+
private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx,
DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException {
return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp);
@@ -1145,7 +1191,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
- private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
+ public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
if (enforcer == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 031569e..e2d7744 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.util.TUtil;
@@ -135,13 +136,25 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
public void enforceDistinctAggregation(int pid,
DistinctAggregationAlgorithm algorithm,
List<SortSpecArray> sortSpecArrays) {
+ enforceDistinctAggregation(pid, false, null, algorithm, sortSpecArrays);
+ }
+
+ public void enforceDistinctAggregation(int pid,
+ boolean isMultipleAggregation,
+ MultipleAggregationStage stage,
+ DistinctAggregationAlgorithm algorithm,
+ List<SortSpecArray> sortSpecArrays) {
EnforceProperty.Builder builder = newProperty();
DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder();
enforce.setPid(pid);
+ enforce.setIsMultipleAggregation(isMultipleAggregation);
enforce.setAlgorithm(algorithm);
if (sortSpecArrays != null) {
enforce.addAllSortSpecArrays(sortSpecArrays);
}
+ if (stage != null) {
+ enforce.setMultipleAggregationStage(stage);
+ }
builder.setType(EnforceType.DISTINCT_GROUP_BY);
builder.setDistinct(enforce.build());
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 432589b..01e02d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -658,47 +658,6 @@ public class GlobalPlanner {
return rewritten;
}
- public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
- DistinctGroupbyNode firstPhaseGroupBy,
- DistinctGroupbyNode secondPhaseGroupBy) {
- DataChannel lastDataChannel = null;
-
- // It pushes down the first phase group-by operator into all child blocks.
- //
- // (second phase) G (currentBlock)
- // /|\
- // / / | \
- // (first phase) G G G G (child block)
-
- // They are already connected one another.
- // So, we don't need to connect them again.
- for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
- if (firstPhaseGroupBy.isEmptyGrouping()) {
- dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
- } else {
- dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
- }
- dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
- ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
-
- // Why must firstPhaseGroupby be copied?
- //
- // A groupby in each execution block can have different child.
- // It affects groupby's input schema.
- DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
- firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
- childBlock.setPlan(firstPhaseGroupbyCopy);
-
- // just keep the last data channel.
- lastDataChannel = dataChannel;
- }
-
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
- secondPhaseGroupBy.setChild(scanNode);
- lastBlock.setPlan(secondPhaseGroupBy);
- return lastBlock;
- }
-
/**
* If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows:
*
@@ -824,8 +783,20 @@ public class GlobalPlanner {
ExecutionBlock currentBlock;
if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
- DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
- return builder.buildPlan(context, lastBlock, groupbyNode);
+ boolean multiLevelEnabled = context.getPlan().getContext().getBool(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED);
+
+ if (multiLevelEnabled) {
+ if (PlannerUtil.findTopNode(groupbyNode, NodeType.UNION) == null) {
+ DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+ return builder.buildMultiLevelPlan(context, lastBlock, groupbyNode);
+ } else {
+ DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+ return builder.buildPlan(context, lastBlock, groupbyNode);
+ }
+ } else {
+ DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+ return builder.buildPlan(context, lastBlock, groupbyNode);
+ }
} else {
GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
@@ -968,6 +939,7 @@ public class GlobalPlanner {
firstPhaseEvals[i].setFirstPhase();
firstPhaseEvalNames[i] = plan.generateUniqueColumnName(firstPhaseEvals[i]);
FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
+ secondPhaseEvals[i].setFinalPhase();
secondPhaseEvals[i].setArgs(new EvalNode[] {param});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 8727b84..cbe2d7e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
+import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
@@ -36,11 +37,14 @@ import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
+import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.util.TUtil;
@@ -56,6 +60,255 @@ public class DistinctGroupbyBuilder {
this.globalPlanner = globalPlanner;
}
+ public ExecutionBlock buildMultiLevelPlan(GlobalPlanContext context,
+ ExecutionBlock latestExecBlock,
+ LogicalNode currentNode) throws PlanningException {
+ try {
+ GroupbyNode groupbyNode = (GroupbyNode) currentNode;
+
+ LogicalPlan plan = context.getPlan().getLogicalPlan();
+
+ DistinctGroupbyNode baseDistinctNode =
+ buildMultiLevelBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode);
+ baseDistinctNode.setGroupbyPlan(groupbyNode);
+
+ // Set total Aggregation Functions.
+ AggregationFunctionCallEval[] aggFunctions =
+ new AggregationFunctionCallEval[groupbyNode.getAggFunctions().length];
+
+ for (int i = 0; i < aggFunctions.length; i++) {
+ aggFunctions[i] = (AggregationFunctionCallEval) groupbyNode.getAggFunctions()[i].clone();
+ aggFunctions[i].setFirstPhase();
+ // If there is not grouping column, we can't find column alias.
+ // Thus we should find the alias at Groupbynode output schema.
+ if (groupbyNode.getGroupingColumns().length == 0
+ && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) {
+ aggFunctions[i].setAlias(groupbyNode.getOutSchema().getColumn(i).getQualifiedName());
+ }
+ }
+
+ if (groupbyNode.getGroupingColumns().length == 0
+ && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) {
+ groupbyNode.setAggFunctions(aggFunctions);
+ }
+
+ baseDistinctNode.setAggFunctions(aggFunctions);
+
+ // Create First, SecondStage's Node using baseNode
+ DistinctGroupbyNode firstStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+ DistinctGroupbyNode secondStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+ DistinctGroupbyNode thirdStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+
+ // Set second, third non-distinct aggregation's eval node to field eval
+ GroupbyNode lastGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(secondStageDistinctNode.getGroupByNodes().size() - 1);
+ if (!lastGroupbyNode.isDistinct()) {
+ int index = 0;
+ for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) {
+ aggrFunction.setIntermediatePhase();
+ aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())});
+ index++;
+ }
+ }
+ lastGroupbyNode = thirdStageDistinctNode.getGroupByNodes().get(thirdStageDistinctNode.getGroupByNodes().size() - 1);
+ if (!lastGroupbyNode.isDistinct()) {
+ int index = 0;
+ for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) {
+ aggrFunction.setFirstPhase();
+ aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())});
+ index++;
+ }
+ }
+
+ // Set in & out schema for each DistinctGroupbyNode.
+ secondStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema());
+ secondStageDistinctNode.setOutSchema(firstStageDistinctNode.getOutSchema());
+ thirdStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema());
+ thirdStageDistinctNode.setOutSchema(groupbyNode.getOutSchema());
+
+ // Set latestExecBlock's plan with firstDistinctNode
+ latestExecBlock.setPlan(firstStageDistinctNode);
+
+ // Make SecondStage ExecutionBlock
+ ExecutionBlock secondStageBlock = context.getPlan().newExecutionBlock();
+
+ // Make ThirdStage ExecutionBlock
+ ExecutionBlock thirdStageBlock = context.getPlan().newExecutionBlock();
+
+ // Set Enforcer
+ setMultiStageAggregationEnforcer(latestExecBlock, firstStageDistinctNode, secondStageBlock,
+ secondStageDistinctNode, thirdStageBlock, thirdStageDistinctNode);
+
+ //Create data channel FirstStage to SecondStage
+ DataChannel firstChannel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 32);
+
+ firstChannel.setShuffleKeys(firstStageDistinctNode.getFirstStageShuffleKeyColumns());
+ firstChannel.setSchema(firstStageDistinctNode.getOutSchema());
+ firstChannel.setStoreType(globalPlanner.getStoreType());
+
+ ScanNode scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), firstChannel);
+ secondStageDistinctNode.setChild(scanNode);
+
+ secondStageBlock.setPlan(secondStageDistinctNode);
+
+ context.getPlan().addConnect(firstChannel);
+
+ DataChannel secondChannel;
+ //Create data channel SecondStage to ThirdStage
+ if (groupbyNode.isEmptyGrouping()) {
+ secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 1);
+ secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+ } else {
+ secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 32);
+ secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+ }
+ secondChannel.setSchema(secondStageDistinctNode.getOutSchema());
+ secondChannel.setStoreType(globalPlanner.getStoreType());
+
+ scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), secondChannel);
+ thirdStageDistinctNode.setChild(scanNode);
+
+ thirdStageBlock.setPlan(thirdStageDistinctNode);
+
+ context.getPlan().addConnect(secondChannel);
+
+ if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
+ buildDistinctGroupbyAndUnionPlan(
+ context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode);
+ }
+
+ return thirdStageBlock;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new PlanningException(e);
+ }
+ }
+
+ private DistinctGroupbyNode buildMultiLevelBaseDistinctGroupByNode(GlobalPlanContext context,
+ ExecutionBlock latestExecBlock,
+ GroupbyNode groupbyNode) {
+ LogicalPlan plan = context.getPlan().getLogicalPlan();
+
+ /*
+ Making DistinctGroupbyNode from GroupByNode
+ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1
+ => DistinctGroupbyNode
+ Distinct Seq
+ grouping key = col1
+ Sub GroupbyNodes
+ - GroupByNode1: grouping(col2), expr(count distinct col2)
+ - GroupByNode2: grouping(col3), expr(count distinct col3)
+ - GroupByNode3: expr(sum col4)
+ */
+ List<Column> originalGroupingColumns = Arrays.asList(groupbyNode.getGroupingColumns());
+
+ List<GroupbyNode> childGroupbyNodes = new ArrayList<GroupbyNode>();
+
+ List<AggregationFunctionCallEval> otherAggregationFunctionCallEvals = new ArrayList<AggregationFunctionCallEval>();
+ List<Target> otherAggregationFunctionTargets = new ArrayList<Target>();
+
+ //distinct columns -> GroupbyNode
+ Map<String, DistinctGroupbyNodeBuildInfo> distinctNodeBuildInfos = new HashMap<String, DistinctGroupbyNodeBuildInfo>();
+ AggregationFunctionCallEval[] aggFunctions = groupbyNode.getAggFunctions();
+ for (int aggIdx = 0; aggIdx < aggFunctions.length; aggIdx++) {
+ AggregationFunctionCallEval aggFunction = aggFunctions[aggIdx];
+ aggFunction.setFirstPhase();
+ Target originAggFunctionTarget = groupbyNode.getTargets()[originalGroupingColumns.size() + aggIdx];
+ Target aggFunctionTarget =
+ new Target(new FieldEval(originAggFunctionTarget.getEvalTree().getName(), aggFunction.getValueType()));
+
+ if (aggFunction.isDistinct()) {
+ // Create or reuse Groupby node for each Distinct expression.
+ LinkedHashSet<Column> groupbyUniqColumns = EvalTreeUtil.findUniqueColumns(aggFunction);
+ String groupbyMapKey = EvalTreeUtil.columnsToStr(groupbyUniqColumns);
+ DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(groupbyMapKey);
+ if (buildInfo == null) {
+ GroupbyNode distinctGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+ buildInfo = new DistinctGroupbyNodeBuildInfo(distinctGroupbyNode);
+ distinctNodeBuildInfos.put(groupbyMapKey, buildInfo);
+
+ // Grouping columns are GROUP BY clause's column + Distinct column.
+ List<Column> groupingColumns = new ArrayList<Column>();
+ for (Column eachGroupingColumn: groupbyUniqColumns) {
+ if (!groupingColumns.contains(eachGroupingColumn)) {
+ groupingColumns.add(eachGroupingColumn);
+ }
+ }
+ distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[]{}));
+ }
+ buildInfo.addAggFunction(aggFunction);
+ buildInfo.addAggFunctionTarget(aggFunctionTarget);
+ } else {
+ otherAggregationFunctionCallEvals.add(aggFunction);
+ otherAggregationFunctionTargets.add(aggFunctionTarget);
+ }
+ }
+
+ List<Target> baseGroupByTargets = new ArrayList<Target>();
+ baseGroupByTargets.add(new Target(new FieldEval(new Column("?distinctseq", Type.INT2))));
+ for (Column column : originalGroupingColumns) {
+ baseGroupByTargets.add(new Target(new FieldEval(column)));
+ }
+
+ //Add child groupby node for each Distinct clause
+ for (String eachKey: distinctNodeBuildInfos.keySet()) {
+ DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(eachKey);
+ GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode();
+ List<AggregationFunctionCallEval> groupbyAggFunctions = buildInfo.getAggFunctions();
+ String [] firstPhaseEvalNames = new String[groupbyAggFunctions.size()];
+ int index = 0;
+ for (AggregationFunctionCallEval eachCallEval: groupbyAggFunctions) {
+ firstPhaseEvalNames[index++] = eachCallEval.getName();
+ }
+
+ Target[] targets = new Target[eachGroupbyNode.getGroupingColumns().length + groupbyAggFunctions.size()];
+ int targetIdx = 0;
+
+ for (Column column : eachGroupbyNode.getGroupingColumns()) {
+ Target target = new Target(new FieldEval(column));
+ targets[targetIdx++] = target;
+ baseGroupByTargets.add(target);
+ }
+ for (Target eachAggFunctionTarget: buildInfo.getAggFunctionTargets()) {
+ targets[targetIdx++] = eachAggFunctionTarget;
+ }
+ eachGroupbyNode.setTargets(targets);
+ eachGroupbyNode.setAggFunctions(groupbyAggFunctions.toArray(new AggregationFunctionCallEval[]{}));
+ eachGroupbyNode.setDistinct(true);
+ eachGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+ childGroupbyNodes.add(eachGroupbyNode);
+ }
+
+ // Merge other aggregation function to a GroupBy Node.
+ if (!otherAggregationFunctionCallEvals.isEmpty()) {
+ // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, .... + AGG_VALUE, SUM_VALUE, ...
+ GroupbyNode otherGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+
+ Target[] targets = new Target[otherAggregationFunctionTargets.size()];
+ int targetIdx = 0;
+ for (Target eachTarget : otherAggregationFunctionTargets) {
+ targets[targetIdx++] = eachTarget;
+ baseGroupByTargets.add(eachTarget);
+ }
+
+ otherGroupbyNode.setTargets(targets);
+ otherGroupbyNode.setGroupingColumns(new Column[]{});
+ otherGroupbyNode.setAggFunctions(otherAggregationFunctionCallEvals.toArray(new AggregationFunctionCallEval[]{}));
+ otherGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+ childGroupbyNodes.add(otherGroupbyNode);
+ }
+
+ DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID());
+ baseDistinctNode.setTargets(baseGroupByTargets.toArray(new Target[]{}));
+ baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns());
+ baseDistinctNode.setInSchema(groupbyNode.getInSchema());
+ baseDistinctNode.setChild(groupbyNode.getChild());
+
+ baseDistinctNode.setGroupbyNodes(childGroupbyNodes);
+
+ return baseDistinctNode;
+ }
public ExecutionBlock buildPlan(GlobalPlanContext context,
ExecutionBlock latestExecBlock,
@@ -66,7 +319,7 @@ public class DistinctGroupbyBuilder {
DistinctGroupbyNode baseDistinctNode = buildBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode);
// Create First, SecondStage's Node using baseNode
- DistinctGroupbyNode[] distinctNodes = createMultiPhaseDistinctNode(plan, groupbyNode, baseDistinctNode);
+ DistinctGroupbyNode[] distinctNodes = createTwoPhaseDistinctNode(plan, groupbyNode, baseDistinctNode);
DistinctGroupbyNode firstStageDistinctNode = distinctNodes[0];
DistinctGroupbyNode secondStageDistinctNode = distinctNodes[1];
@@ -100,7 +353,7 @@ public class DistinctGroupbyBuilder {
context.getPlan().addConnect(channel);
if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
- globalPlanner.buildDistinctGroupbyAndUnionPlan(
+ buildDistinctGroupbyAndUnionPlan(
context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode);
}
@@ -162,6 +415,7 @@ public class DistinctGroupbyBuilder {
buildInfo.addAggFunction(aggFunction);
buildInfo.addAggFunctionTarget(aggFunctionTarget);
} else {
+ aggFunction.setFinalPhase();
otherAggregationFunctionCallEvals.add(aggFunction);
otherAggregationFunctionTargets.add(aggFunctionTarget);
}
@@ -224,7 +478,7 @@ public class DistinctGroupbyBuilder {
return baseDistinctNode;
}
- public DistinctGroupbyNode[] createMultiPhaseDistinctNode(LogicalPlan plan,
+ public DistinctGroupbyNode[] createTwoPhaseDistinctNode(LogicalPlan plan,
GroupbyNode originGroupbyNode,
DistinctGroupbyNode baseDistinctNode) {
/*
@@ -456,6 +710,75 @@ public class DistinctGroupbyBuilder {
}
+ private void setMultiStageAggregationEnforcer(
+ ExecutionBlock firstStageBlock, DistinctGroupbyNode firstStageDistinctNode,
+ ExecutionBlock secondStageBlock, DistinctGroupbyNode secondStageDistinctNode,
+ ExecutionBlock thirdStageBlock, DistinctGroupbyNode thirdStageDistinctNode) {
+ firstStageBlock.getEnforcer().enforceDistinctAggregation(firstStageDistinctNode.getPID(),
+ true, MultipleAggregationStage.FIRST_STAGE,
+ DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+ secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(),
+ true, MultipleAggregationStage.SECOND_STAGE,
+ DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+ List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>();
+ int index = 0;
+ for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+ List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>();
+ for (Column column: groupbyNode.getGroupingColumns()) {
+ sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build());
+ }
+ sortSpecArrays.add( SortSpecArray.newBuilder()
+ .setPid(thirdStageDistinctNode.getGroupByNodes().get(index).getPID())
+ .addAllSortSpecs(sortSpecs).build());
+ }
+ thirdStageBlock.getEnforcer().enforceDistinctAggregation(thirdStageDistinctNode.getPID(),
+ true, MultipleAggregationStage.THRID_STAGE,
+ DistinctAggregationAlgorithm.SORT_AGGREGATION, sortSpecArrays);
+ }
+
+ private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+ DistinctGroupbyNode firstPhaseGroupBy,
+ DistinctGroupbyNode secondPhaseGroupBy) {
+ DataChannel lastDataChannel = null;
+
+ // It pushes down the first phase group-by operator into all child blocks.
+ //
+ // (second phase) G (currentBlock)
+ // /|\
+ // / / | \
+ // (first phase) G G G G (child block)
+
+ // They are already connected one another.
+ // So, we don't need to connect them again.
+ for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+ if (firstPhaseGroupBy.isEmptyGrouping()) {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+ } else {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
+ }
+ dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+ ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+ // Why must firstPhaseGroupby be copied?
+ //
+ // A groupby in each execution block can have different child.
+ // It affects groupby's input schema.
+ DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+ firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+ childBlock.setPlan(firstPhaseGroupbyCopy);
+
+ // just keep the last data channel.
+ lastDataChannel = dataChannel;
+ }
+
+ ScanNode scanNode = GlobalPlanner.buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+ secondPhaseGroupBy.setChild(scanNode);
+ lastBlock.setPlan(secondPhaseGroupBy);
+ return lastBlock;
+ }
+
static class DistinctGroupbyNodeBuildInfo {
private GroupbyNode groupbyNode;
private List<AggregationFunctionCallEval> aggFunctions = new ArrayList<AggregationFunctionCallEval>();
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
index b1e4bc3..47e8933 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
@@ -27,10 +27,15 @@ import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.util.TUtil;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class DistinctGroupbyNode extends UnaryNode implements Projectable, Cloneable {
@Expose
+ private GroupbyNode groupbyPlan;
+
+ @Expose
private List<GroupbyNode> groupByNodes;
@Expose
@@ -42,6 +47,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
@Expose
private int[] resultColumnIds;
+ /** Aggregation Functions */
+ @Expose private AggregationFunctionCallEval [] aggrFunctions;
+
public DistinctGroupbyNode(int pid) {
super(pid, NodeType.DISTINCT_GROUP_BY);
}
@@ -59,7 +67,11 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
@Override
public Target[] getTargets() {
- return new Target[0];
+ if (hasTargets()) {
+ return targets;
+ } else {
+ return new Target[0];
+ }
}
public void setGroupbyNodes(List<GroupbyNode> groupByNodes) {
@@ -86,6 +98,18 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
this.resultColumnIds = resultColumnIds;
}
+ public AggregationFunctionCallEval [] getAggFunctions() {
+ return this.aggrFunctions;
+ }
+
+ public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+ this.aggrFunctions = evals;
+ }
+
+ public void setGroupbyPlan(GroupbyNode groupbyPlan) { this.groupbyPlan = groupbyPlan; }
+
+ public GroupbyNode getGroupbyPlan() { return this.groupbyPlan; }
+
@Override
public Object clone() throws CloneNotSupportedException {
DistinctGroupbyNode cloneNode = (DistinctGroupbyNode)super.clone();
@@ -113,6 +137,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
}
}
+ if (groupbyPlan != null) {
+ cloneNode.groupbyPlan = (GroupbyNode)groupbyPlan.clone();
+ }
return cloneNode;
}
@@ -200,4 +227,27 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
return planStr;
}
+
+ public Column[] getFirstStageShuffleKeyColumns() {
+ List<Column> shuffleKeyColumns = new ArrayList<Column>();
+ shuffleKeyColumns.add(getOutSchema().getColumn(0)); //distinctseq column
+ if (groupingColumns != null) {
+ for (Column eachColumn: groupingColumns) {
+ if (!shuffleKeyColumns.contains(eachColumn)) {
+ shuffleKeyColumns.add(eachColumn);
+ }
+ }
+ }
+ for (GroupbyNode eachGroupbyNode: groupByNodes) {
+ if (eachGroupbyNode.getGroupingColumns() != null && eachGroupbyNode.getGroupingColumns().length > 0) {
+ for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
+ if (!shuffleKeyColumns.contains(eachColumn)) {
+ shuffleKeyColumns.add(eachColumn);
+ }
+ }
+ }
+ }
+
+ return shuffleKeyColumns.toArray(new Column[]{});
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
new file mode 100644
index 0000000..7201ed4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Int2Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * This class incremented each row to more rows by grouping columns. In addition, the operator must creates each row
+ * because of aggregation non-distinct columns.
+ *
+ * For example, there is a query as follows:
+ * select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+ * count(distinct l_partkey), sum(l_orderkey)
+ * from lineitem
+ * group by l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+ *
+ * If you execute above query on tajo, FileScanner makes tuples after scanning raw data as follows:
+ *
+ * -----------------------------------------------------------------------------
+ * l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_orderkey, l_partkey
+ * -----------------------------------------------------------------------------
+ * 1, N, O, 1996-03-13, 1, 1
+ * 2, N, O, 1996-04-12, 1, 1
+ * 1, N, O, 1997-01-28, 2, 2
+ * 1, R, F, 1994-02-02, 3, 2
+ * 2, R, F, 1993-11-09, 3, 3
+ *
+ * And then the scanner will push it as input data to this class. After then, this class will makes output data as
+ * follows:
+ *
+ * -------------------------------------------------------------------------------------------------------------------
+ * NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct,
+ * l_orderkey for distinct, l_orderkey for nondistinct
+ * -------------------------------------------------------------------------------------------------------------------
+ * 0, 2, R, F, 1993-11-09, 3, NULL, 3
+ * 0, 2, N, O, 1996-04-12, 1, NULL, 1
+ * 0, 1, N, O, 1997-01-28, 2, NULL, 2
+ * 0, 1, R, F, 1994-02-02, 2, NULL, 3
+ * 0, 1, N, O, 1996-03-13, 1, NULL, 1
+ * 1, 2, R, F, 1993-11-09, NULL, 3, NULL
+ * 1, 2, N, O, 1996-04-12, NULL, 1, NULL
+ * 1, 1, N, O, 1997-01-28, NULL, 2, NULL
+ * 1, 1, R, F, 1994-02-02, NULL, 3, NULL
+ * 1, 1, N, O, 1996-03-13, NULL, 1, NULL
+ *
+ * For reference, NodeSequence means GroupByNode sequence. In this case, there are two GroupByNode. And it consist
+ * of lineitem.l_partkey and lineitem.l_orderkey. The NodeSequence of lineitem.l_partkey is zero and the sequence of
+ * lineitem.l_orderkey is one. As above output data, If there are uncomfortable column for DistinctGroupBy,
+ * inner aggregator makes it to NullDataTum.
+ *
+ * In addition, columns for NonDistinctGroupBy only can contains real value at first NodeSequence.
+ *
+ */
+
+public class DistinctGroupbyFirstAggregationExec extends PhysicalExec {
+ private static Log LOG = LogFactory.getLog(DistinctGroupbyFirstAggregationExec.class);
+
+ private DistinctGroupbyNode plan;
+ private boolean finished = false;
+ private boolean preparedData = false;
+ private PhysicalExec child;
+
+ private long totalNumRows;
+ private int fetchedRows;
+ private float progress;
+
+ private int[] groupingKeyIndexes;
+ private NonDistinctHashAggregator nonDistinctHashAggregator;
+ private DistinctHashAggregator[] distinctAggregators;
+
+ private int resultTupleLength;
+
+ public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp)
+ throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+ this.child = subOp;
+ this.plan = plan;
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+ child.init();
+
+ // finding grouping column index
+ Column[] groupingColumns = plan.getGroupingColumns();
+ groupingKeyIndexes = new int[groupingColumns.length];
+
+ int index = 0;
+ for (Column col: groupingColumns) {
+ int keyIndex;
+ if (col.hasQualifier()) {
+ keyIndex = inSchema.getColumnId(col.getQualifiedName());
+ } else {
+ keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+ }
+ groupingKeyIndexes[index++] = keyIndex;
+ }
+ resultTupleLength = groupingKeyIndexes.length + 1; //1 is Sequence Datum which indicates sequence of DistinctNode.
+
+ List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+ List<DistinctHashAggregator> distinctAggrList = new ArrayList<DistinctHashAggregator>();
+ int distinctSeq = 0;
+ for (GroupbyNode eachGroupby: groupbyNodes) {
+ if (eachGroupby.isDistinct()) {
+ DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby);
+ aggregator.setNodeSequence(distinctSeq++);
+ distinctAggrList.add(aggregator);
+ resultTupleLength += aggregator.getTupleLength();
+ } else {
+ nonDistinctHashAggregator = new NonDistinctHashAggregator(eachGroupby);
+ resultTupleLength += nonDistinctHashAggregator.getTupleLength();
+ }
+ }
+ distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[]{});
+ }
+
+ private int currentAggregatorIndex = 0;
+
+ @Override
+ public Tuple next() throws IOException {
+ if (!preparedData) {
+ prepareInputData();
+ }
+
+ int prevIndex = currentAggregatorIndex;
+ while (!context.isStopped()) {
+ DistinctHashAggregator aggregator = distinctAggregators[currentAggregatorIndex];
+ Tuple result = aggregator.next();
+ if (result != null) {
+ return result;
+ }
+ currentAggregatorIndex++;
+ currentAggregatorIndex = currentAggregatorIndex % distinctAggregators.length;
+ if (currentAggregatorIndex == prevIndex) {
+ finished = true;
+ return null;
+ }
+ }
+
+ return null;
+ }
+
+ private void prepareInputData() throws IOException {
+ Tuple tuple = null;
+ while(!context.isStopped() && (tuple = child.next()) != null) {
+ Tuple groupingKey = new VTuple(groupingKeyIndexes.length);
+ for (int i = 0; i < groupingKeyIndexes.length; i++) {
+ groupingKey.put(i, tuple.get(groupingKeyIndexes[i]));
+ }
+ for (int i = 0; i < distinctAggregators.length; i++) {
+ distinctAggregators[i].compute(groupingKey, tuple);
+ }
+ if (nonDistinctHashAggregator != null) {
+ nonDistinctHashAggregator.compute(groupingKey, tuple);
+ }
+ }
+ for (int i = 0; i < distinctAggregators.length; i++) {
+ distinctAggregators[i].rescan();
+ }
+
+ totalNumRows = distinctAggregators[0].distinctAggrDatas.size();
+ preparedData = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ child.close();
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (child != null) {
+ return child.getInputStats();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ if (finished) {
+ return progress;
+ } else {
+ if (totalNumRows > 0) {
+ return progress + ((float)fetchedRows / (float)totalNumRows) * 0.5f;
+ } else {
+ return progress;
+ }
+ }
+ }
+
+ @Override
+ public void rescan() {
+ finished = false;
+ currentAggregatorIndex = 0;
+ for (int i = 0; i < distinctAggregators.length; i++) {
+ distinctAggregators[i].rescan();
+ }
+ }
+
+ class NonDistinctHashAggregator {
+ private GroupbyNode groupbyNode;
+ private int aggFunctionsNum;
+ private final AggregationFunctionCallEval aggFunctions[];
+
+ // GroupingKey -> FunctionContext[]
+ private Map<Tuple, FunctionContext[]> nonDistinctAggrDatas;
+ private int tupleLength;
+
+ private Tuple dummyTuple;
+ private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException {
+ this.groupbyNode = groupbyNode;
+
+ nonDistinctAggrDatas = new HashMap<Tuple, FunctionContext[]>();
+
+ if (groupbyNode.hasAggFunctions()) {
+ aggFunctions = groupbyNode.getAggFunctions();
+ aggFunctionsNum = aggFunctions.length;
+ for (AggregationFunctionCallEval eachFunction: aggFunctions) {
+ eachFunction.setFirstPhase();
+ }
+ } else {
+ aggFunctions = new AggregationFunctionCallEval[0];
+ aggFunctionsNum = 0;
+ }
+
+ dummyTuple = new VTuple(aggFunctionsNum);
+ for (int i = 0; i < aggFunctionsNum; i++) {
+ dummyTuple.put(i, NullDatum.get());
+ }
+ tupleLength = aggFunctionsNum;
+ }
+
+ public void compute(Tuple groupingKeyTuple, Tuple tuple) {
+ FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple);
+ if (contexts != null) {
+ for (int i = 0; i < aggFunctions.length; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ } else { // if the key occurs firstly
+ contexts = new FunctionContext[aggFunctionsNum];
+ for (int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ nonDistinctAggrDatas.put(groupingKeyTuple, contexts);
+ }
+ }
+
+ public Tuple aggregate(Tuple groupingKey) {
+ FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKey);
+ if (contexts == null) {
+ return null;
+ }
+ Tuple tuple = new VTuple(aggFunctionsNum);
+
+ for (int i = 0; i < aggFunctionsNum; i++) {
+ tuple.put(i, aggFunctions[i].terminate(contexts[i]));
+ }
+
+ return tuple;
+ }
+
+ public int getTupleLength() {
+ return tupleLength;
+ }
+
+ public Tuple getDummyTuple() {
+ return dummyTuple;
+ }
+ }
+
+ class DistinctHashAggregator {
+ private GroupbyNode groupbyNode;
+
+ // GroupingKey -> DistinctKey
+ private Map<Tuple, Set<Tuple>> distinctAggrDatas;
+ private Iterator<Entry<Tuple, Set<Tuple>>> iterator = null;
+
+ private int nodeSequence;
+ private Int2Datum nodeSequenceDatum;
+
+ private int[] distinctKeyIndexes;
+
+ private int tupleLength;
+ private Tuple dummyTuple;
+ private boolean aggregatorFinished = false;
+
+ public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException {
+ this.groupbyNode = groupbyNode;
+
+ Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();
+ for (Integer eachIndex: groupingKeyIndexes) {
+ groupingKeyIndexSet.add(eachIndex);
+ }
+
+ List<Integer> distinctGroupingKeyIndexSet = new ArrayList<Integer>();
+ Column[] groupingColumns = groupbyNode.getGroupingColumns();
+ for (int idx = 0; idx < groupingColumns.length; idx++) {
+ Column col = groupingColumns[idx];
+ int keyIndex;
+ if (col.hasQualifier()) {
+ keyIndex = inSchema.getColumnId(col.getQualifiedName());
+ } else {
+ keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+ }
+ if (!groupingKeyIndexSet.contains(keyIndex)) {
+ distinctGroupingKeyIndexSet.add(keyIndex);
+ }
+ }
+ int index = 0;
+ this.distinctKeyIndexes = new int[distinctGroupingKeyIndexSet.size()];
+ this.dummyTuple = new VTuple(distinctGroupingKeyIndexSet.size());
+ for (Integer eachId : distinctGroupingKeyIndexSet) {
+ this.dummyTuple.put(index, NullDatum.get());
+ this.distinctKeyIndexes[index++] = eachId;
+ }
+
+ this.distinctAggrDatas = new HashMap<Tuple, Set<Tuple>>();
+ this.tupleLength = distinctKeyIndexes.length;
+ }
+
+ public void setNodeSequence(int nodeSequence) {
+ this.nodeSequence = nodeSequence;
+ this.nodeSequenceDatum = new Int2Datum((short)nodeSequence);
+ }
+
+ public int getTupleLength() {
+ return tupleLength;
+ }
+
+ public void compute(Tuple groupingKey, Tuple tuple) throws IOException {
+ Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length);
+ for (int i = 0; i < distinctKeyIndexes.length; i++) {
+ distinctKeyTuple.put(i, tuple.get(distinctKeyIndexes[i]));
+ }
+
+ Set<Tuple> distinctEntry = distinctAggrDatas.get(groupingKey);
+ if (distinctEntry == null) {
+ distinctEntry = new HashSet<Tuple>();
+ distinctAggrDatas.put(groupingKey, distinctEntry);
+ }
+ distinctEntry.add(distinctKeyTuple);
+ }
+
+ public void rescan() {
+ iterator = distinctAggrDatas.entrySet().iterator();
+ currentGroupingTuples = null;
+ groupingKeyChanged = false;
+ aggregatorFinished = false;
+ }
+
+ public void close() throws IOException {
+ distinctAggrDatas.clear();
+ distinctAggrDatas = null;
+ currentGroupingTuples = null;
+ iterator = null;
+ }
+
+ Entry<Tuple, Set<Tuple>> currentGroupingTuples;
+ Iterator<Tuple> distinctKeyIterator;
+ boolean groupingKeyChanged = false;
+
+ public Tuple next() {
+ if (aggregatorFinished) {
+ return null;
+ }
+ if (currentGroupingTuples == null) {
+ // first
+ if (!iterator.hasNext()) {
+ // Empty case
+ aggregatorFinished = true;
+ return null;
+ }
+ currentGroupingTuples = iterator.next();
+ groupingKeyChanged = true;
+ distinctKeyIterator = currentGroupingTuples.getValue().iterator();
+ }
+ if (!distinctKeyIterator.hasNext()) {
+ if (!iterator.hasNext()) {
+ aggregatorFinished = true;
+ return null;
+ }
+ currentGroupingTuples = iterator.next();
+ groupingKeyChanged = true;
+ distinctKeyIterator = currentGroupingTuples.getValue().iterator();
+ }
+ // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ...
+ // If n'st == this.nodeSequence set with real data, otherwise set with NullDatum
+ Tuple tuple = new VTuple(resultTupleLength);
+ int tupleIndex = 0;
+ tuple.put(tupleIndex++, nodeSequenceDatum);
+
+ // merge grouping key
+ Tuple groupingKeyTuple = currentGroupingTuples.getKey();
+ int groupingKeyLength = groupingKeyTuple.size();
+ for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) {
+ tuple.put(tupleIndex, groupingKeyTuple.get(i));
+ }
+
+ // merge distinctKey
+ for (int i = 0; i < distinctAggregators.length; i++) {
+ if (i == nodeSequence) {
+ Tuple distinctKeyTuple = distinctKeyIterator.next();
+ int distinctKeyLength = distinctKeyTuple.size();
+ for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) {
+ tuple.put(tupleIndex, distinctKeyTuple.get(j));
+ }
+ } else {
+ Tuple dummyTuple = distinctAggregators[i].getDummyTuple();
+ int dummyTupleSize = dummyTuple.size();
+ for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) {
+ tuple.put(tupleIndex, dummyTuple.get(j));
+ }
+ }
+ }
+
+ // merge non distinct aggregation tuple
+ if (nonDistinctHashAggregator != null) {
+ Tuple nonDistinctTuple;
+ if (nodeSequence == 0 && groupingKeyChanged) {
+ groupingKeyChanged = false;
+ nonDistinctTuple = nonDistinctHashAggregator.aggregate(groupingKeyTuple);
+ if (nonDistinctTuple == null) {
+ nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple();
+ }
+ } else {
+ nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple();
+ }
+ int tupleSize = nonDistinctTuple.size();
+ for (int j = 0; j < tupleSize; j++, tupleIndex++) {
+ tuple.put(tupleIndex, nonDistinctTuple.get(j));
+ }
+ }
+ return tuple;
+ }
+
+ public Tuple getDummyTuple() {
+ return dummyTuple;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
new file mode 100644
index 0000000..bc8885f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
@@ -0,0 +1,295 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class adjusts shuffle columns between DistinctGroupbyFirstAggregationExec and
+ * DistinctGroupbyThirdAggregationExec. It shuffled by grouping columns and aggregation columns. Because of the
+ * shuffle, more DistinctGroupbyThirdAggregationExec will execute compare than previous two distinct group by
+ * algorithm. And then, many DistinctGroupbyThirdAggregationExec improve the performance of count distinct query.
+ *
+ * For example, there is a query as follows:
+ * select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+ * count(distinct l_partkey), sum(l_orderkey)
+ * from lineitem
+ * group by l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+ *
+ * In this case, execution plan for this operator will set shuffle type as follows:
+ * Incoming: 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), default.lineitem.l_linenumber (INT4),
+ * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), default.lineitem.l_shipdate (TEXT),
+ * default.lineitem.l_partkey (INT4), default.lineitem.l_orderkey (INT4), num=32)
+ *
+ * Outgoing: 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber (INT4),
+ * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT),
+ * default.lineitem.l_shipdate (TEXT), num=32)
+ *
+ * For reference, input data and output data results as follows:
+ *
+ * -------------------------------------------------------------------------------------------------------------------
+ * NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct,
+ * l_orderkey for distinct, l_orderkey for nondistinct
+ * -------------------------------------------------------------------------------------------------------------------
+ * 0, 2, R, F, 1993-11-09, 3, NULL, 3
+ * 0, 2, N, O, 1996-04-12, 1, NULL, 1
+ * 0, 1, N, O, 1997-01-28, 2, NULL, 2
+ * 0, 1, R, F, 1994-02-02, 2, NULL, 3
+ * 0, 1, N, O, 1996-03-13, 1, NULL, 1
+ * 1, 2, R, F, 1993-11-09, NULL, 3, NULL
+ * 1, 2, N, O, 1996-04-12, NULL, 1, NULL
+ * 1, 1, N, O, 1997-01-28, NULL, 2, NULL
+ * 1, 1, R, F, 1994-02-02, NULL, 3, NULL
+ * 1, 1, N, O, 1996-03-13, NULL, 1, NULL
+ *
+ */
+public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
+ private static Log LOG = LogFactory.getLog(DistinctGroupbySecondAggregationExec.class);
+ private DistinctGroupbyNode plan;
+ private PhysicalExec child;
+
+ private boolean finished = false;
+
+ private int numGroupingColumns;
+ private int[][] distinctKeyIndexes;
+ private FunctionContext[] nonDistinctAggrContexts;
+ private AggregationFunctionCallEval[] nonDistinctAggrFunctions;
+ private int nonDistinctAggrTupleStartIndex = -1;
+
+ public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
+ throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
+ this.plan = plan;
+ this.child = sortExec;
+ }
+
+ @Override
+ public void init() throws IOException {
+ this.child.init();
+
+ numGroupingColumns = plan.getGroupingColumns().length;
+
+ List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+ // Finding distinct group by column index.
+ Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();
+ for (Column col: plan.getGroupingColumns()) {
+ int keyIndex;
+ if (col.hasQualifier()) {
+ keyIndex = inSchema.getColumnId(col.getQualifiedName());
+ } else {
+ keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+ }
+ groupingKeyIndexSet.add(keyIndex);
+ }
+
+ int numDistinct = 0;
+ for (GroupbyNode eachGroupby : groupbyNodes) {
+ if (eachGroupby.isDistinct()) {
+ numDistinct++;
+ } else {
+ nonDistinctAggrFunctions = eachGroupby.getAggFunctions();
+ if (nonDistinctAggrFunctions != null) {
+ for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) {
+ eachFunction.setIntermediatePhase();
+ }
+ nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length];
+ }
+ }
+ }
+
+ int index = 0;
+ distinctKeyIndexes = new int[numDistinct][];
+ for (GroupbyNode eachGroupby : groupbyNodes) {
+ if (eachGroupby.isDistinct()) {
+ List<Integer> distinctGroupingKeyIndex = new ArrayList<Integer>();
+ Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns();
+ for (int idx = 0; idx < distinctGroupingColumns.length; idx++) {
+ Column col = distinctGroupingColumns[idx];
+ int keyIndex;
+ if (col.hasQualifier()) {
+ keyIndex = inSchema.getColumnId(col.getQualifiedName());
+ } else {
+ keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+ }
+ if (!groupingKeyIndexSet.contains(keyIndex)) {
+ distinctGroupingKeyIndex.add(keyIndex);
+ }
+ }
+ int i = 0;
+ distinctKeyIndexes[index] = new int[distinctGroupingKeyIndex.size()];
+ for (int eachIdx : distinctGroupingKeyIndex) {
+ distinctKeyIndexes[index][i++] = eachIdx;
+ }
+ index++;
+ }
+ }
+ if (nonDistinctAggrFunctions != null) {
+ nonDistinctAggrTupleStartIndex = inSchema.size() - nonDistinctAggrFunctions.length;
+ }
+ }
+
+ Tuple prevKeyTuple = null;
+ Tuple prevTuple = null;
+ int prevSeq = -1;
+
+ @Override
+ public Tuple next() throws IOException {
+ if (finished) {
+ return null;
+ }
+
+ Tuple result = null;
+ while (!context.isStopped()) {
+ Tuple childTuple = child.next();
+ if (childTuple == null) {
+ finished = true;
+
+ if (prevTuple == null) {
+ // Empty case
+ return null;
+ }
+ if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
+ terminatedNonDistinctAggr(prevTuple);
+ }
+ result = prevTuple;
+ break;
+ }
+
+ Tuple tuple = null;
+ try {
+ tuple = childTuple.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ int distinctSeq = tuple.get(0).asInt2();
+ Tuple keyTuple = getKeyTuple(distinctSeq, tuple);
+
+ if (prevKeyTuple == null) {
+ // First
+ if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+ initNonDistinctAggrContext();
+ mergeNonDistinctAggr(tuple);
+ }
+ prevKeyTuple = keyTuple;
+ prevTuple = tuple;
+ prevSeq = distinctSeq;
+ continue;
+ }
+
+ if (!prevKeyTuple.equals(keyTuple)) {
+ // new grouping key
+ if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
+ terminatedNonDistinctAggr(prevTuple);
+ }
+ result = prevTuple;
+
+ prevKeyTuple = keyTuple;
+ prevTuple = tuple;
+ prevSeq = distinctSeq;
+
+ if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+ initNonDistinctAggrContext();
+ mergeNonDistinctAggr(tuple);
+ }
+ break;
+ } else {
+ prevKeyTuple = keyTuple;
+ prevTuple = tuple;
+ prevSeq = distinctSeq;
+ if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+ mergeNonDistinctAggr(tuple);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private void initNonDistinctAggrContext() {
+ if (nonDistinctAggrFunctions != null) {
+ nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length];
+ for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+ nonDistinctAggrContexts[i] = nonDistinctAggrFunctions[i].newContext();
+ }
+ }
+ }
+
+ private void mergeNonDistinctAggr(Tuple tuple) {
+ if (nonDistinctAggrFunctions == null) {
+ return;
+ }
+ for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+ nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], inSchema, tuple);
+ }
+ }
+
+ private void terminatedNonDistinctAggr(Tuple tuple) {
+ if (nonDistinctAggrFunctions == null) {
+ return;
+ }
+ for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+ tuple.put(nonDistinctAggrTupleStartIndex + i, nonDistinctAggrFunctions[i].terminate(nonDistinctAggrContexts[i]));
+ }
+ }
+
+ private Tuple getKeyTuple(int distinctSeq, Tuple tuple) {
+ int[] columnIndexes = distinctKeyIndexes[distinctSeq];
+
+ Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1);
+ keyTuple.put(0, tuple.get(0));
+ for (int i = 0; i < numGroupingColumns; i++) {
+ keyTuple.put(i + 1, tuple.get(i + 1));
+ }
+ for (int i = 0; i < columnIndexes.length; i++) {
+ keyTuple.put(i + 1 + numGroupingColumns, tuple.get(columnIndexes[i]));
+ }
+
+ return keyTuple;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ prevKeyTuple = null;
+ prevTuple = null;
+ finished = false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
new file mode 100644
index 0000000..239dabf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * This class aggregates the output of DistinctGroupbySecondAggregationExec.
+ *
+ */
+public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
+ private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class);
+ private DistinctGroupbyNode plan;
+ private PhysicalExec child;
+
+ private boolean finished = false;
+
+ private DistinctFinalAggregator[] aggregators;
+ private DistinctFinalAggregator nonDistinctAggr;
+
+ private int resultTupleLength;
+ private int numGroupingColumns;
+
+ private int[] resultTupleIndexes;
+
+ public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
+ throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
+ this.plan = plan;
+ this.child = sortExec;
+ }
+
+ @Override
+ public void init() throws IOException {
+ this.child.init();
+
+ numGroupingColumns = plan.getGroupingColumns().length;
+ resultTupleLength = numGroupingColumns;
+
+ List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+ List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>();
+ int inTupleIndex = 1 + numGroupingColumns;
+ int outTupleIndex = numGroupingColumns;
+ int distinctSeq = 0;
+
+ for (GroupbyNode eachGroupby : groupbyNodes) {
+ if (eachGroupby.isDistinct()) {
+ aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby));
+ distinctSeq++;
+
+ Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns();
+ inTupleIndex += distinctGroupingColumns.length;
+ outTupleIndex += eachGroupby.getAggFunctions().length;
+ } else {
+ nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby);
+ outTupleIndex += eachGroupby.getAggFunctions().length;
+ }
+ resultTupleLength += eachGroupby.getAggFunctions().length;
+ }
+ aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{});
+
+ // make output schema mapping index
+ resultTupleIndexes = new int[outSchema.size()];
+ Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, Integer>();
+ int resultTupleIndex = 0;
+ for (Column eachColumn: plan.getGroupingColumns()) {
+ groupbyResultTupleIndex.put(eachColumn, resultTupleIndex);
+ resultTupleIndex++;
+ }
+ for (GroupbyNode eachGroupby : groupbyNodes) {
+ Set<Column> groupingColumnSet = new HashSet<Column>();
+ for (Column column: eachGroupby.getGroupingColumns()) {
+ groupingColumnSet.add(column);
+ }
+ for (Target eachTarget: eachGroupby.getTargets()) {
+ if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) {
+ //aggr function
+ groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex);
+ resultTupleIndex++;
+ }
+ }
+ }
+
+ int index = 0;
+ for (Column eachOutputColumn: outSchema.getColumns()) {
+ // If column is avg aggregation function, outschema's column type is float
+ // but groupbyResultTupleIndex's column type is protobuf
+
+ int matchedIndex = -1;
+ for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) {
+ if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName())) {
+ matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn);
+ break;
+ }
+ }
+ if (matchedIndex < 0) {
+ throw new IOException("Can't find proper output column mapping: " + eachOutputColumn);
+ }
+ resultTupleIndexes[matchedIndex] = index++;
+ }
+ }
+
+ Tuple prevKeyTuple = null;
+ Tuple prevTuple = null;
+
+ @Override
+ public Tuple next() throws IOException {
+ if (finished) {
+ return null;
+ }
+
+ Tuple resultTuple = new VTuple(resultTupleLength);
+
+ while (!context.isStopped()) {
+ Tuple childTuple = child.next();
+ // Last tuple
+ if (childTuple == null) {
+ finished = true;
+
+ if (prevTuple == null) {
+ // Empty case
+ if (numGroupingColumns == 0) {
+ // No grouping column, return null tuple
+ return makeEmptyTuple();
+ } else {
+ return null;
+ }
+ }
+
+ for (int i = 0; i < numGroupingColumns; i++) {
+ resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+ }
+ for (DistinctFinalAggregator eachAggr: aggregators) {
+ eachAggr.terminate(resultTuple);
+ }
+ break;
+ }
+
+ Tuple tuple = null;
+ try {
+ tuple = childTuple.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ int distinctSeq = tuple.get(0).asInt2();
+ Tuple keyTuple = getGroupingKeyTuple(tuple);
+
+ // First tuple
+ if (prevKeyTuple == null) {
+ prevKeyTuple = keyTuple;
+ prevTuple = tuple;
+
+ aggregators[distinctSeq].merge(tuple);
+ continue;
+ }
+
+ if (!prevKeyTuple.equals(keyTuple)) {
+ // new grouping key
+ for (int i = 0; i < numGroupingColumns; i++) {
+ resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+ }
+ for (DistinctFinalAggregator eachAggr: aggregators) {
+ eachAggr.terminate(resultTuple);
+ }
+
+ prevKeyTuple = keyTuple;
+ prevTuple = tuple;
+
+ aggregators[distinctSeq].merge(tuple);
+ break;
+ } else {
+ prevKeyTuple = keyTuple;
+ prevTuple = tuple;
+ aggregators[distinctSeq].merge(tuple);
+ }
+ }
+
+ return resultTuple;
+ }
+
+ private Tuple makeEmptyTuple() {
+ Tuple resultTuple = new VTuple(resultTupleLength);
+ for (DistinctFinalAggregator eachAggr: aggregators) {
+ eachAggr.terminateEmpty(resultTuple);
+ }
+
+ return resultTuple;
+ }
+
+ private Tuple getGroupingKeyTuple(Tuple tuple) {
+ Tuple keyTuple = new VTuple(numGroupingColumns);
+ for (int i = 0; i < numGroupingColumns; i++) {
+ keyTuple.put(i, tuple.get(i + 1));
+ }
+
+ return keyTuple;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ prevKeyTuple = null;
+ prevTuple = null;
+ finished = false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ class DistinctFinalAggregator {
+ private FunctionContext[] functionContexts;
+ private AggregationFunctionCallEval[] aggrFunctions;
+ private int seq;
+ private int inTupleIndex;
+ private int outTupleIndex;
+ public DistinctFinalAggregator(int seq, int inTupleIndex, int outTupleIndex, GroupbyNode groupbyNode) {
+ this.seq = seq;
+ this.inTupleIndex = inTupleIndex;
+ this.outTupleIndex = outTupleIndex;
+
+ aggrFunctions = groupbyNode.getAggFunctions();
+ if (aggrFunctions != null) {
+ for (AggregationFunctionCallEval eachFunction: aggrFunctions) {
+ eachFunction.setFinalPhase();
+ }
+ }
+ newFunctionContext();
+ }
+
+ private void newFunctionContext() {
+ functionContexts = new FunctionContext[aggrFunctions.length];
+ for (int i = 0; i < aggrFunctions.length; i++) {
+ functionContexts[i] = aggrFunctions[i].newContext();
+ }
+ }
+
+ public void merge(Tuple tuple) {
+ for (int i = 0; i < aggrFunctions.length; i++) {
+ aggrFunctions[i].merge(functionContexts[i], inSchema, tuple);
+ }
+
+ if (seq == 0 && nonDistinctAggr != null) {
+ if (!tuple.get(nonDistinctAggr.inTupleIndex).isNull()) {
+ nonDistinctAggr.merge(tuple);
+ }
+ }
+ }
+
+ public void terminate(Tuple resultTuple) {
+ for (int i = 0; i < aggrFunctions.length; i++) {
+ resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i]));
+ }
+ newFunctionContext();
+
+ if (seq == 0 && nonDistinctAggr != null) {
+ nonDistinctAggr.terminate(resultTuple);
+ }
+ }
+
+ public void terminateEmpty(Tuple resultTuple) {
+ newFunctionContext();
+ for (int i = 0; i < aggrFunctions.length; i++) {
+ resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i]));
+ }
+ if (seq == 0 && nonDistinctAggr != null) {
+ nonDistinctAggr.terminateEmpty(resultTuple);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 4deddee..598054c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -33,10 +33,8 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
-import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -45,6 +43,8 @@ import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import org.apache.tajo.master.TaskSchedulerContext;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
import org.apache.tajo.storage.AbstractStorageManager;
@@ -799,13 +799,30 @@ public class Repartitioner {
}
int groupingColumns = 0;
- GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
- if (groupby != null) {
- groupingColumns = groupby.getGroupingColumns().length;
- } else {
- DistinctGroupbyNode dGroupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
- if (dGroupby != null) {
- groupingColumns = dGroupby.getGroupingColumns().length;
+ LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(),
+ new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY});
+ if (groupbyNodes != null && groupbyNodes.length > 0) {
+ LogicalNode bottomNode = groupbyNodes[0];
+ if (bottomNode.getType() == NodeType.GROUP_BY) {
+ groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length;
+ } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+ DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+ if (distinctNode == null) {
+ LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
+ distinctNode = (DistinctGroupbyNode)bottomNode;
+ }
+ groupingColumns = distinctNode.getGroupingColumns().length;
+
+ Enforcer enforcer = execBlock.getEnforcer();
+ EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
+ if (property != null) {
+ if (property.getDistinct().getIsMultipleAggregation()) {
+ MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
+ if (stage != MultipleAggregationStage.THRID_STAGE) {
+ groupingColumns = distinctNode.getOutSchema().size();
+ }
+ }
+ }
}
}
// get a proper number of tasks
@@ -1145,7 +1162,8 @@ public class Repartitioner {
// set the partition number for group by and sort
if (channel.getShuffleType() == HASH_SHUFFLE) {
- if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
+ if (execBlock.getPlan().getType() == NodeType.GROUP_BY ||
+ execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) {
keys = channel.getShuffleKeys();
}
} else if (channel.getShuffleType() == RANGE_SHUFFLE) {