You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/05/20 20:46:43 UTC
[45/48] git commit: TAJO-801: Multiple distinct should be supported.
(Hyoungjun Kim via hyunsik)
TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9350a802
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9350a802
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9350a802
Branch: refs/heads/window_function
Commit: 9350a8026b107da11ed2dc8457ad95d3f2153f0c
Parents: 1c21e8b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri May 16 08:53:01 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri May 16 08:53:01 2014 +0900
----------------------------------------------------------------------
CHANGES | 6 +-
.../main/java/org/apache/tajo/util/TUtil.java | 11 +
.../apache/tajo/engine/eval/EvalTreeUtil.java | 11 +
.../tajo/engine/parser/HiveQLAnalyzer.java | 3 +-
.../engine/planner/BasicLogicalPlanVisitor.java | 12 +
.../planner/ExplainLogicalPlanVisitor.java | 6 +
.../tajo/engine/planner/ExprAnnotator.java | 30 +-
.../tajo/engine/planner/LogicalPlanVisitor.java | 3 +
.../engine/planner/PhysicalPlannerImpl.java | 66 +++
.../apache/tajo/engine/planner/PlannerUtil.java | 6 +
.../engine/planner/PreLogicalPlanVerifier.java | 23 -
.../tajo/engine/planner/enforce/Enforcer.java | 38 ++
.../engine/planner/global/GlobalPlanner.java | 67 ++-
.../global/builder/DistinctGroupbyBuilder.java | 476 +++++++++++++++++++
.../planner/logical/DistinctGroupbyNode.java | 203 ++++++++
.../engine/planner/logical/GroupbyNode.java | 42 ++
.../tajo/engine/planner/logical/NodeType.java | 1 +
.../DistinctGroupbyHashAggregationExec.java | 388 +++++++++++++++
.../DistinctGroupbySortAggregationExec.java | 158 ++++++
.../planner/physical/ExternalSortExec.java | 3 +-
.../planner/physical/HashAggregateExec.java | 8 +-
.../planner/physical/SortAggregateExec.java | 8 +-
.../src/main/proto/TajoWorkerProtocol.proto | 17 +
.../tajo/engine/query/TestGroupByQuery.java | 49 +-
.../testDistinctAggregation7.sql | 6 +
.../testDistinctAggregation_case1.sql | 7 +
.../testDistinctAggregation_case2.sql | 8 +
.../testDistinctAggregation_case3.sql | 9 +
.../testDistinctAggregation_case4.sql | 10 +
.../testDistinctAggregation_case5.sql | 10 +
.../testDistinctAggregation_case6.sql | 12 +
.../testDistinctAggregation_case7.sql | 9 +
.../testDistinctAggregation7.result | 3 +
.../testDistinctAggregation_case1.result | 4 +
.../testDistinctAggregation_case2.result | 4 +
.../testDistinctAggregation_case3.result | 4 +
.../testDistinctAggregation_case4.result | 5 +
.../testDistinctAggregation_case5.result | 5 +
.../testDistinctAggregation_case6.result | 5 +
.../testDistinctAggregation_case7.result | 5 +
40 files changed, 1686 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 75deb0f..222f02e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,8 +4,8 @@ Release 0.9.0 - unreleased
NEW FEATURES
- TAJO-494: Extend TajoClient to run a query with a plan context serialized as the
- JSON form. (jihoon)
+ TAJO-494: Extend TajoClient to run a query with a plan context serialized
+ as the JSON form. (jihoon)
TAJO-761: Implements INTERVAL type. (Hyoungjun Kim via hyunsik)
@@ -15,6 +15,8 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik)
+
TAJO-807: Implement Round(numeric, int) function.
(Seungun Choe via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 431c930..832c1e5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -236,6 +236,17 @@ public class TUtil {
return collection.toArray((T[]) array);
}
+ public static int[] toArray(Collection<Integer> collection) {
+ int[] array = new int[collection.size()];
+
+ int index = 0;
+ for (Integer eachInt: collection) {
+ array[index++] = eachInt;
+ }
+
+ return array;
+ }
+
/**
* It returns the exact code point at which this running thread is executed.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 2417193..8982bd5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -140,6 +140,17 @@ public class EvalTreeUtil {
return schema;
}
+
+ public static String columnsToStr(Collection<Column> columns) {
+ StringBuilder sb = new StringBuilder();
+ String prefix = "";
+ for (Column column: columns) {
+ sb.append(prefix).append(column.getQualifiedName());
+ prefix = ",";
+ }
+
+ return sb.toString();
+ }
public static DataType getDomainByExpr(Schema inputSchema, EvalNode expr)
throws InternalException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
index 60e9685..de4b159 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
@@ -1141,7 +1141,8 @@ public class HiveQLAnalyzer extends HiveQLParserBaseVisitor<Expr> {
boolean isDistinct = false;
if (ctx.getChild(2) != null) {
- if (ctx.getChild(2) instanceof TerminalNodeImpl && ctx.getChild(2).getText().equalsIgnoreCase("DISTINCT")) {
+ if (ctx.getChild(2) instanceof TerminalNodeImpl
+ && ctx.getChild(2).getText().equalsIgnoreCase("DISTINCT_GROUP_BY")) {
isDistinct = true;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 0f758bf..3bffefb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -74,6 +74,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
case GROUP_BY:
current = visitGroupBy(context, plan, block, (GroupbyNode) node, stack);
break;
+ case DISTINCT_GROUP_BY:
+ current = visitDistinct(context, plan, block, (DistinctGroupbyNode) node, stack);
+ break;
case SELECTION:
current = visitFilter(context, plan, block, (SelectionNode) node, stack);
break;
@@ -185,6 +188,15 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
}
@Override
+ public RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+ Stack<LogicalNode> stack) throws PlanningException {
+ stack.push(node);
+ RESULT result = visit(context, plan, block, node.getChild(), stack);
+ stack.pop();
+ return result;
+ }
+
+ @Override
public RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SelectionNode node,
Stack<LogicalNode> stack) throws PlanningException {
stack.push(node);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
index 9dd8700..ad9bdf1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
@@ -108,6 +108,12 @@ public class ExplainLogicalPlanVisitor extends BasicLogicalPlanVisitor<ExplainLo
return visitUnaryNode(context, plan, block, node, stack);
}
+ @Override
+ public LogicalNode visitDistinct(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+ Stack<LogicalNode> stack) throws PlanningException {
+ return visitUnaryNode(context, plan, block, node, stack);
+ }
+
private LogicalNode visitUnaryNode(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
UnaryNode node, Stack<LogicalNode> stack) throws PlanningException {
context.depth++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
index dfbe600..e74fd70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
@@ -580,22 +580,22 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
try {
- CatalogProtos.FunctionType functionType = funcDesc.getFuncType();
- if (functionType == CatalogProtos.FunctionType.GENERAL
- || functionType == CatalogProtos.FunctionType.UDF) {
- return new GeneralFunctionEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
- } else if (functionType == CatalogProtos.FunctionType.AGGREGATION
- || functionType == CatalogProtos.FunctionType.UDA) {
- if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
- ctx.currentBlock.setAggregationRequire();
+ CatalogProtos.FunctionType functionType = funcDesc.getFuncType();
+ if (functionType == CatalogProtos.FunctionType.GENERAL
+ || functionType == CatalogProtos.FunctionType.UDF) {
+ return new GeneralFunctionEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
+ } else if (functionType == CatalogProtos.FunctionType.AGGREGATION
+ || functionType == CatalogProtos.FunctionType.UDA) {
+ if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
+ ctx.currentBlock.setAggregationRequire();
+ }
+ return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
+ } else if (functionType == CatalogProtos.FunctionType.DISTINCT_AGGREGATION
+ || functionType == CatalogProtos.FunctionType.DISTINCT_UDA) {
+ throw new PlanningException("Unsupported function: " + funcDesc.toString());
+ } else {
+ throw new PlanningException("Unsupported Function Type: " + functionType.name());
}
- return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
- } else if (functionType == CatalogProtos.FunctionType.DISTINCT_AGGREGATION
- || functionType == CatalogProtos.FunctionType.DISTINCT_UDA) {
- throw new PlanningException("Unsupported function: " + funcDesc.toString());
- } else {
- throw new PlanningException("Unsupported Function Type: " + functionType.name());
- }
} catch (InternalException e) {
throw new PlanningException(e);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index 17b5d0a..6850046 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -41,6 +41,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
RESULT visitGroupBy(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
Stack<LogicalNode> stack) throws PlanningException;
+ RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+ Stack<LogicalNode> stack) throws PlanningException;
+
RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SelectionNode node,
Stack<LogicalNode> stack) throws PlanningException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2053e36..e508d2c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
@@ -38,6 +39,9 @@ import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
@@ -174,6 +178,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
stack.pop();
return createGroupByPlan(ctx, grpNode, leftExec);
+ case DISTINCT_GROUP_BY:
+ DistinctGroupbyNode distinctNode = (DistinctGroupbyNode) logicalNode;
+ stack.push(distinctNode);
+ leftExec = createPlanRecursive(ctx, distinctNode.getChild(), stack);
+ stack.pop();
+ return createDistinctGroupByPlan(ctx, distinctNode, leftExec);
+
case HAVING:
HavingNode havingNode = (HavingNode) logicalNode;
stack.push(havingNode);
@@ -962,6 +973,57 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
+ public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context,
+ DistinctGroupbyNode distinctNode, PhysicalExec subOp)
+ throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode);
+ if (property != null) {
+ DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
+ if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
+ return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+ } else {
+ return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
+ }
+ } else {
+ return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+ }
+ }
+
+ private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx,
+ DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException {
+ return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp);
+ }
+
+ private PhysicalExec createSortAggregationDistinctGroupbyExec(TaskAttemptContext ctx,
+ DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp,
+ DistinctGroupbyEnforcer enforcer) throws IOException {
+ List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getGroupByNodes();
+
+ SortAggregateExec[] sortAggregateExec = new SortAggregateExec[groupbyNodes.size()];
+
+ List<SortSpecArray> sortSpecArrays = enforcer.getSortSpecArraysList();
+
+ int index = 0;
+ for (GroupbyNode eachGroupbyNode: groupbyNodes) {
+ SortSpecArray sortSpecArray = sortSpecArrays.get(index);
+ SortSpec[] sortSpecs = new SortSpec[sortSpecArray.getSortSpecsList().size()];
+ int sortIndex = 0;
+ for (SortSpecProto eachProto: sortSpecArray.getSortSpecsList()) {
+ sortSpecs[sortIndex++] = new SortSpec(eachProto);
+ }
+ SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+ sortNode.setSortSpecs(sortSpecs);
+ sortNode.setInSchema(subOp.getSchema());
+ sortNode.setOutSchema(eachGroupbyNode.getInSchema());
+ ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
+
+ sortAggregateExec[index++] = new SortAggregateExec(ctx, eachGroupbyNode, sortExec);
+ }
+
+ return new DistinctGroupbySortAggregationExec(ctx, distinctGroupbyNode, sortAggregateExec);
+ }
+
public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
PhysicalExec child) throws IOException {
@@ -1025,6 +1087,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
type = EnforceType.JOIN;
} else if (node.getType() == NodeType.GROUP_BY) {
type = EnforceType.GROUP_BY;
+ } else if (node.getType() == NodeType.DISTINCT_GROUP_BY) {
+ type = EnforceType.DISTINCT_GROUP_BY;
} else if (node.getType() == NodeType.SORT) {
type = EnforceType.SORT;
} else if (node instanceof StoreTableNode
@@ -1043,6 +1107,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
found = property;
} else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) {
found = property;
+ } else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getPid() == node.getPID()) {
+ found = property;
} else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) {
found = property;
} else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getPid() == node.getPID()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 1f97d14..a1ff0f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -670,6 +670,12 @@ public class PlannerUtil {
copy.setPID(-1);
} else {
copy.setPID(plan.newPID());
+ if (node instanceof DistinctGroupbyNode) {
+ DistinctGroupbyNode dNode = (DistinctGroupbyNode)copy;
+ for (GroupbyNode eachNode: dNode.getGroupByNodes()) {
+ eachNode.setPID(plan.newPID());
+ }
+ }
}
return copy;
} catch (CloneNotSupportedException e) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 1ee0878..5eca5fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.planner;
-import com.google.common.collect.ObjectArrays;
import org.apache.tajo.algebra.*;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.CatalogUtil;
@@ -26,7 +25,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.util.TUtil;
-import java.util.Arrays;
import java.util.Set;
import java.util.Stack;
@@ -70,28 +68,7 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe
}
}
- // no two aggregations can have different DISTINCT columns.
- //
- // For example, the following query will work
- // SELECT count(DISTINCT col1) and sum(DISTINCT col1) ..
- //
- // But, the following query will not work in this time
- //
- // SELECT count(DISTINCT col1) and SUM(DISTINCT col2) ..
Set<GeneralSetFunctionExpr> exprs = ExprFinder.finds(namedExpr.getExpr(), OpType.GeneralSetFunction);
- if (exprs.size() > 0) {
- for (GeneralSetFunctionExpr setFunction : exprs) {
- if (distinctValues == null && setFunction.isDistinct()) {
- distinctValues = setFunction.getParams();
- } else if (distinctValues != null && setFunction.isDistinct()) {
- if (!Arrays.equals(distinctValues, setFunction.getParams())) {
- Expr [] differences = ObjectArrays.concat(distinctValues, setFunction.getParams(), Expr.class);
- throw new PlanningException("different DISTINCT columns are not supported yet: "
- + TUtil.arrayToString(differences));
- }
- }
- }
- }
// Currently, avg functions with distinct aggregation are not supported.
// This code does not allow users to use avg functions with distinct aggregation.
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 91190f6..742736c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -23,6 +23,8 @@ import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.util.TUtil;
import java.util.Collection;
@@ -130,6 +132,22 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
TUtil.putToNestedList(properties, builder.getType(), builder.build());
}
+ public void enforceDistinctAggregation(int pid,
+ DistinctAggregationAlgorithm algorithm,
+ List<SortSpecArray> sortSpecArrays) {
+ EnforceProperty.Builder builder = newProperty();
+ DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(algorithm);
+ if (sortSpecArrays != null) {
+ enforce.addAllSortSpecArrays(sortSpecArrays);
+ }
+
+ builder.setType(EnforceType.DISTINCT_GROUP_BY);
+ builder.setDistinct(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) {
EnforceProperty.Builder builder = newProperty();
SortEnforce.Builder enforce = SortEnforce.newBuilder();
@@ -218,6 +236,26 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
}
}
break;
+ case DISTINCT_GROUP_BY:
+ DistinctGroupbyEnforcer distinct = property.getDistinct();
+ sb.append("type=Distinct,alg=");
+ if (distinct.getAlgorithm() == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
+ sb.append("hash");
+ } else {
+ sb.append("sort");
+ sb.append(",keys=");
+ String recordDelim = "";
+ for (SortSpecArray sortSpecArray : distinct.getSortSpecArraysList()) {
+ sb.append(recordDelim);
+ String delim = "";
+ for (CatalogProtos.SortSpecProto sortSpec: sortSpecArray.getSortSpecsList()) {
+ sb.append(delim).append(sortSpec.getColumn().getName());
+ delim = ",";
+ }
+ recordDelim = " | ";
+ }
+ }
+ break;
case BROADCAST:
BroadcastEnforce broadcast = property.getBroadcast();
sb.append("type=Broadcast, tables=").append(broadcast.getTableName());
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 9002ac0..16def83 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -37,6 +37,7 @@ import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.eval.FieldEval;
import org.apache.tajo.engine.function.AggFunction;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
import org.apache.tajo.exception.InternalException;
@@ -88,9 +89,21 @@ public class GlobalPlanner {
}
}
+ public CatalogProtos.StoreType getStoreType() {
+ return storeType;
+ }
+
public class GlobalPlanContext {
MasterPlan plan;
Map<Integer, ExecutionBlock> execBlockMap = Maps.newHashMap();
+
+ public MasterPlan getPlan() {
+ return plan;
+ }
+
+ public Map<Integer, ExecutionBlock> getExecBlockMap() {
+ return execBlockMap;
+ }
}
/**
@@ -140,7 +153,7 @@ public class GlobalPlanner {
}
masterPlan.setTerminal(terminalBlock);
- LOG.info(masterPlan.toString());
+ LOG.info("\n" + masterPlan.toString());
}
private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) {
@@ -456,6 +469,47 @@ public class GlobalPlanner {
return rewritten;
}
+ public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+ DistinctGroupbyNode firstPhaseGroupBy,
+ DistinctGroupbyNode secondPhaseGroupBy) {
+ DataChannel lastDataChannel = null;
+
+ // It pushes down the first phase group-by operator into all child blocks.
+ //
+ // (second phase) G (currentBlock)
+ // /|\
+ // / / | \
+ // (first phase) G G G G (child block)
+
+ // They are already connected one another.
+ // So, we don't need to connect them again.
+ for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+ if (firstPhaseGroupBy.isEmptyGrouping()) {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+ } else {
+ dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
+ }
+ dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+ ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+ // Why must firstPhaseGroupby be copied?
+ //
+ // A groupby in each execution block can have different child.
+ // It affects groupby's input schema.
+ DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+ firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+ childBlock.setPlan(firstPhaseGroupbyCopy);
+
+ // just keep the last data channel.
+ lastDataChannel = dataChannel;
+ }
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+ secondPhaseGroupBy.setChild(scanNode);
+ lastBlock.setPlan(secondPhaseGroupBy);
+ return lastBlock;
+ }
+
/**
* If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows:
*
@@ -493,7 +547,7 @@ public class GlobalPlanner {
* As a result, although a no-distinct aggregation requires two stages, a distinct aggregation requires three
* execution blocks.
*/
- private ExecutionBlock buildGroupByIncludingDistinctFunctions(GlobalPlanContext context,
+ private ExecutionBlock buildGroupByIncludingDistinctFunctionsMultiStage(GlobalPlanContext context,
ExecutionBlock latestExecBlock,
GroupbyNode groupbyNode) throws PlanningException {
@@ -505,7 +559,6 @@ public class GlobalPlanner {
List<Target> firstPhaseEvalNodeTargets = Lists.newArrayList();
for (AggregationFunctionCallEval aggFunction : groupbyNode.getAggFunctions()) {
-
if (aggFunction.isDistinct()) {
// add distinct columns to first stage's grouping columns
firstStageGroupingColumns.addAll(EvalTreeUtil.findUniqueColumns(aggFunction));
@@ -535,6 +588,7 @@ public class GlobalPlanner {
for (Target target : firstPhaseEvalNodeTargets) {
firstStageTargets[i++] = target;
}
+
// Create the groupby node for the first stage and set all necessary descriptions
GroupbyNode firstStageGroupby = new GroupbyNode(context.plan.getLogicalPlan().newPID());
firstStageGroupby.setGroupingColumns(TUtil.toArray(firstStageGroupingColumns, Column.class));
@@ -577,12 +631,12 @@ public class GlobalPlanner {
private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
GroupbyNode groupbyNode) throws PlanningException {
-
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
- return buildGroupByIncludingDistinctFunctions(context, lastBlock, groupbyNode);
+ DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+ return builder.buildPlan(context, lastBlock, groupbyNode);
} else {
GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
@@ -597,7 +651,7 @@ public class GlobalPlanner {
return currentBlock;
}
- public boolean hasUnionChild(UnaryNode node) {
+ public static boolean hasUnionChild(UnaryNode node) {
// there are two cases:
//
@@ -676,6 +730,7 @@ public class GlobalPlanner {
private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+
ExecutionBlock childBlock = latestBlock;
childBlock.setPlan(firstPhaseGroupby);
ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
new file mode 100644
index 0000000..1ccd9dc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.builder;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
+
+public class DistinctGroupbyBuilder {
+ private static Log LOG = LogFactory.getLog(DistinctGroupbyBuilder.class);
+ private GlobalPlanner globalPlanner;
+
+ public DistinctGroupbyBuilder(GlobalPlanner globalPlanner) {
+ this.globalPlanner = globalPlanner;
+ }
+
+
+ public ExecutionBlock buildPlan(GlobalPlanContext context,
+ ExecutionBlock latestExecBlock,
+ LogicalNode currentNode) throws PlanningException {
+ try {
+ GroupbyNode groupbyNode = (GroupbyNode)currentNode;
+ LogicalPlan plan = context.getPlan().getLogicalPlan();
+ DistinctGroupbyNode baseDistinctNode = buildBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode);
+
+ // Create First, SecondStage's Node using baseNode
+ DistinctGroupbyNode[] distinctNodes = createMultiPhaseDistinctNode(plan, groupbyNode, baseDistinctNode);
+
+ DistinctGroupbyNode firstStageDistinctNode = distinctNodes[0];
+ DistinctGroupbyNode secondStageDistinctNode = distinctNodes[1];
+
+ // Set latestExecBlock's plan with firstDistinctNode
+ latestExecBlock.setPlan(firstStageDistinctNode);
+
+ // Make SecondStage ExecutionBlock
+ ExecutionBlock secondStageBlock = context.getPlan().newExecutionBlock();
+
+ // Set Enforcer: SecondStage => SortAggregationAlgorithm
+ setDistinctAggregationEnforcer(latestExecBlock, firstStageDistinctNode, secondStageBlock, secondStageDistinctNode);
+
+ //Create data channel FirstStage to SecondStage
+ DataChannel channel;
+ if (groupbyNode.isEmptyGrouping()) {
+ channel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 1);
+ channel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+ } else {
+ channel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 32);
+ channel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+ }
+ channel.setSchema(firstStageDistinctNode.getOutSchema());
+ channel.setStoreType(globalPlanner.getStoreType());
+
+ ScanNode scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), channel);
+ secondStageDistinctNode.setChild(scanNode);
+
+ secondStageBlock.setPlan(secondStageDistinctNode);
+
+ context.getPlan().addConnect(channel);
+
+ if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
+ globalPlanner.buildDistinctGroupbyAndUnionPlan(
+ context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode);
+ }
+
+ return secondStageBlock;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new PlanningException(e);
+ }
+ }
+
+ private DistinctGroupbyNode buildBaseDistinctGroupByNode(GlobalPlanContext context,
+ ExecutionBlock latestExecBlock,
+ GroupbyNode groupbyNode) {
+
+ /*
+ Making DistinctGroupbyNode from GroupByNode
+ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1
+ => DistinctGroupbyNode
+ grouping key = col1
+ Sub GroupbyNodes
+ - GroupByNode1: grouping(col1, col2), expr(count distinct col2)
+ - GroupByNode2: grouping(col1, col3), expr(count distinct col3)
+ - GroupByNode3: grouping(col1), expr(sum col4)
+ */
+ List<Column> originalGroupingColumns = Arrays.asList(groupbyNode.getGroupingColumns());
+
+ List<GroupbyNode> childGroupbyNodes = new ArrayList<GroupbyNode>();
+
+ List<AggregationFunctionCallEval> otherAggregationFunctionCallEvals = new ArrayList<AggregationFunctionCallEval>();
+ List<Target> otherAggregationFunctionTargets = new ArrayList<Target>();
+
+ //distinct columns -> GroupbyNode
+ Map<String, DistinctGroupbyNodeBuildInfo> distinctNodeBuildInfos = new HashMap<String, DistinctGroupbyNodeBuildInfo>();
+
+ AggregationFunctionCallEval[] aggFunctions = groupbyNode.getAggFunctions();
+ for (int aggIdx = 0; aggIdx < aggFunctions.length; aggIdx++) {
+ AggregationFunctionCallEval aggFunction = aggFunctions[aggIdx];
+ Target aggFunctionTarget = groupbyNode.getTargets()[originalGroupingColumns.size() + aggIdx];
+
+ if (aggFunction.isDistinct()) {
+ // Create or reuse Groupby node for each Distinct expression.
+ LinkedHashSet<Column> groupbyUniqColumns = EvalTreeUtil.findUniqueColumns(aggFunction);
+ String groupbyMapKey = EvalTreeUtil.columnsToStr(groupbyUniqColumns);
+ DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(groupbyMapKey);
+ if (buildInfo == null) {
+ GroupbyNode distinctGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+ buildInfo = new DistinctGroupbyNodeBuildInfo(distinctGroupbyNode);
+ distinctNodeBuildInfos.put(groupbyMapKey, buildInfo);
+
+ // Grouping columns are GROUP BY clause's column + Distinct column.
+ List<Column> groupingColumns = new ArrayList<Column>(originalGroupingColumns);
+ for (Column eachGroupingColumn: groupbyUniqColumns) {
+ if (!groupingColumns.contains(eachGroupingColumn)) {
+ groupingColumns.add(eachGroupingColumn);
+ }
+ }
+ distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[]{}));
+ }
+ buildInfo.addAggFunction(aggFunction);
+ buildInfo.addAggFunctionTarget(aggFunctionTarget);
+ } else {
+ otherAggregationFunctionCallEvals.add(aggFunction);
+ otherAggregationFunctionTargets.add(aggFunctionTarget);
+ }
+ }
+
+ //Add child groupby node for each Distinct clause
+ for (String eachKey: distinctNodeBuildInfos.keySet()) {
+ DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(eachKey);
+ GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode();
+ List<AggregationFunctionCallEval> groupbyAggFunctions = buildInfo.getAggFunctions();
+ Target[] targets = new Target[eachGroupbyNode.getGroupingColumns().length + groupbyAggFunctions.size()];
+ int targetIdx = 0;
+
+ for (Column column : eachGroupbyNode.getGroupingColumns()) {
+ Target target = new Target(new FieldEval(column));
+ targets[targetIdx++] = target;
+ }
+ for (Target eachAggFunctionTarget: buildInfo.getAggFunctionTargets()) {
+ targets[targetIdx++] = eachAggFunctionTarget;
+ }
+ eachGroupbyNode.setTargets(targets);
+ eachGroupbyNode.setAggFunctions(groupbyAggFunctions.toArray(new AggregationFunctionCallEval[]{}));
+ eachGroupbyNode.setDistinct(true);
+ eachGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+ childGroupbyNodes.add(eachGroupbyNode);
+ }
+
+ // Merge other aggregation function to a GroupBy Node.
+ if (!otherAggregationFunctionCallEvals.isEmpty()) {
+ // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, .... + AGG_VALUE, SUM_VALUE, ...
+ GroupbyNode otherGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+
+ Target[] targets = new Target[originalGroupingColumns.size() + otherAggregationFunctionTargets.size()];
+ int targetIdx = 0;
+ for (Column column : originalGroupingColumns) {
+ Target target = new Target(new FieldEval(column));
+ targets[targetIdx++] = target;
+ }
+ for (Target eachTarget : otherAggregationFunctionTargets) {
+ targets[targetIdx++] = eachTarget;
+ }
+
+ otherGroupbyNode.setTargets(targets);
+ otherGroupbyNode.setGroupingColumns(originalGroupingColumns.toArray(new Column[]{}));
+ otherGroupbyNode.setAggFunctions(otherAggregationFunctionCallEvals.toArray(new AggregationFunctionCallEval[]{}));
+ otherGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+ childGroupbyNodes.add(otherGroupbyNode);
+ }
+
+ DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID());
+ baseDistinctNode.setTargets(groupbyNode.getTargets());
+ baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns());
+ baseDistinctNode.setInSchema(groupbyNode.getInSchema());
+ baseDistinctNode.setChild(groupbyNode.getChild());
+
+ baseDistinctNode.setGroupbyNodes(childGroupbyNodes);
+
+ return baseDistinctNode;
+ }
+
+ public DistinctGroupbyNode[] createMultiPhaseDistinctNode(LogicalPlan plan,
+ GroupbyNode originGroupbyNode,
+ DistinctGroupbyNode baseDistinctNode) {
+ /*
+ Creating 2 stage execution block
+ - first stage: HashAggregation -> groupby distinct column and eval not distinct aggregation
+ ==> HashShuffle
+ - second stage: SortAggregation -> sort and eval(aggregate) with distinct aggregation function, not distinct aggregation
+
+ select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1
+ -------------------------------------------------------------------------
+ - baseDistinctNode
+ grouping key = col1
+ - GroupByNode1: grouping(col1, col2), expr(count distinct col2)
+ - GroupByNode2: grouping(col1, col3), expr(count distinct col3)
+ - GroupByNode3: grouping(col1), expr(sum col4)
+ -------------------------------------------------------------------------
+ - FirstStage:
+ - GroupByNode1: grouping(col1, col2)
+ - GroupByNode2: grouping(col1, col3)
+ - GroupByNode3: grouping(col1), expr(sum col4)
+
+ - SecondStage:
+ - GroupByNode1: grouping(col1, col2), expr(count distinct col2)
+ - GroupByNode2: grouping(col1, col3), expr(count distinct col3)
+ - GroupByNode3: grouping(col1), expr(sum col4)
+ */
+
+ Preconditions.checkNotNull(baseDistinctNode);
+
+ Schema originOutputSchema = originGroupbyNode.getOutSchema();
+ DistinctGroupbyNode firstStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+ DistinctGroupbyNode secondStageDistinctNode = baseDistinctNode;
+
+ List<Column> originGroupColumns = Arrays.asList(firstStageDistinctNode.getGroupingColumns());
+
+ int[] secondStageColumnIds = new int[secondStageDistinctNode.getOutSchema().size()];
+ int columnIdIndex = 0;
+ for (Column column: secondStageDistinctNode.getGroupingColumns()) {
+ secondStageColumnIds[originOutputSchema.getColumnId(column.getQualifiedName())] = columnIdIndex;
+ columnIdIndex++;
+ }
+
+ // Split groupby node into two stage.
+ // - Remove distinct aggregations from FirstStage.
+ // - Change SecondStage's aggregation expr and target column name. For example:
+ // exprs: (sum(default.lineitem.l_quantity (FLOAT8))) ==> exprs: (sum(?sum_3 (FLOAT8)))
+ int grpIdx = 0;
+ for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+ GroupbyNode secondStageGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(grpIdx);
+
+ if (firstStageGroupbyNode.isDistinct()) {
+ // FirstStage: Remove aggregation, Set target with only grouping columns
+ firstStageGroupbyNode.setAggFunctions(null);
+
+ List<Target> firstGroupbyTargets = new ArrayList<Target>();
+ for (Column column : firstStageGroupbyNode.getGroupingColumns()) {
+ Target target = new Target(new FieldEval(column));
+ firstGroupbyTargets.add(target);
+ }
+ firstStageGroupbyNode.setTargets(firstGroupbyTargets.toArray(new Target[]{}));
+
+ // SecondStage:
+ // Set grouping column with origin groupby's columns
+ // Remove distinct group column from targets
+ secondStageGroupbyNode.setGroupingColumns(originGroupColumns.toArray(new Column[]{}));
+
+ Target[] oldTargets = secondStageGroupbyNode.getTargets();
+ List<Target> secondGroupbyTargets = new ArrayList<Target>();
+ LinkedHashSet<Column> distinctColumns = EvalTreeUtil.findUniqueColumns(secondStageGroupbyNode.getAggFunctions()[0]);
+ List<Column> uniqueDistinctColumn = new ArrayList<Column>();
+ // remove origin group by column from distinctColumns
+ for (Column eachColumn: distinctColumns) {
+ if (!originGroupColumns.contains(eachColumn)) {
+ uniqueDistinctColumn.add(eachColumn);
+ }
+ }
+ for (int i = 0; i < originGroupColumns.size(); i++) {
+ secondGroupbyTargets.add(oldTargets[i]);
+ if (grpIdx > 0) {
+ columnIdIndex++;
+ }
+ }
+
+ for (int aggFuncIdx = 0; aggFuncIdx < secondStageGroupbyNode.getAggFunctions().length; aggFuncIdx++) {
+ int targetIdx = originGroupColumns.size() + uniqueDistinctColumn.size() + aggFuncIdx;
+ Target aggFuncTarget = oldTargets[targetIdx];
+ secondGroupbyTargets.add(aggFuncTarget);
+ int outputColumnId = originOutputSchema.getColumnId(aggFuncTarget.getNamedColumn().getQualifiedName());
+ secondStageColumnIds[outputColumnId] = columnIdIndex;
+ columnIdIndex++;
+ }
+ secondStageGroupbyNode.setTargets(secondGroupbyTargets.toArray(new Target[]{}));
+ } else {
+ // FirstStage: Change target of aggFunction to function name expr
+ List<Target> firstGroupbyTargets = new ArrayList<Target>();
+ for (Column column : firstStageDistinctNode.getGroupingColumns()) {
+ firstGroupbyTargets.add(new Target(new FieldEval(column)));
+ columnIdIndex++;
+ }
+
+ int aggFuncIdx = 0;
+ for (AggregationFunctionCallEval aggFunction: firstStageGroupbyNode.getAggFunctions()) {
+ aggFunction.setFirstPhase();
+ String firstEvalNames = plan.generateUniqueColumnName(aggFunction);
+ FieldEval firstEval = new FieldEval(firstEvalNames, aggFunction.getValueType());
+ firstGroupbyTargets.add(new Target(firstEval));
+
+ AggregationFunctionCallEval secondStageAggFunction = secondStageGroupbyNode.getAggFunctions()[aggFuncIdx];
+ secondStageAggFunction.setArgs(new EvalNode[] {firstEval});
+
+ Target secondTarget = secondStageGroupbyNode.getTargets()[secondStageGroupbyNode.getGroupingColumns().length + aggFuncIdx];
+ int outputColumnId = originOutputSchema.getColumnId(secondTarget.getNamedColumn().getQualifiedName());
+ secondStageColumnIds[outputColumnId] = columnIdIndex;
+ columnIdIndex++;
+ aggFuncIdx++;
+ }
+ firstStageGroupbyNode.setTargets(firstGroupbyTargets.toArray(new Target[]{}));
+ secondStageGroupbyNode.setInSchema(firstStageGroupbyNode.getOutSchema());
+ }
+ grpIdx++;
+ }
+
+ // In the case of distinct query without group by clause
+ // other aggregation function is added to last distinct group by node.
+ List<GroupbyNode> secondStageGroupbyNodes = secondStageDistinctNode.getGroupByNodes();
+ GroupbyNode lastSecondStageGroupbyNode = secondStageGroupbyNodes.get(secondStageGroupbyNodes.size() - 1);
+ if (!lastSecondStageGroupbyNode.isDistinct() && lastSecondStageGroupbyNode.isEmptyGrouping()) {
+ GroupbyNode otherGroupbyNode = lastSecondStageGroupbyNode;
+ lastSecondStageGroupbyNode = secondStageGroupbyNodes.get(secondStageGroupbyNodes.size() - 2);
+ secondStageGroupbyNodes.remove(secondStageGroupbyNodes.size() - 1);
+
+ Target[] targets =
+ new Target[lastSecondStageGroupbyNode.getTargets().length + otherGroupbyNode.getTargets().length];
+ System.arraycopy(lastSecondStageGroupbyNode.getTargets(), 0,
+ targets, 0, lastSecondStageGroupbyNode.getTargets().length);
+ System.arraycopy(otherGroupbyNode.getTargets(), 0, targets,
+ lastSecondStageGroupbyNode.getTargets().length, otherGroupbyNode.getTargets().length);
+
+ lastSecondStageGroupbyNode.setTargets(targets);
+
+ AggregationFunctionCallEval[] aggFunctions =
+ new AggregationFunctionCallEval[lastSecondStageGroupbyNode.getAggFunctions().length + otherGroupbyNode.getAggFunctions().length];
+ System.arraycopy(lastSecondStageGroupbyNode.getAggFunctions(), 0,
+ aggFunctions, 0, lastSecondStageGroupbyNode.getAggFunctions().length);
+ System.arraycopy(otherGroupbyNode.getAggFunctions(), 0, aggFunctions,
+ lastSecondStageGroupbyNode.getAggFunctions().length, otherGroupbyNode.getAggFunctions().length);
+
+ lastSecondStageGroupbyNode.setAggFunctions(aggFunctions);
+ }
+
+ // Set FirstStage DistinctNode's target with grouping column and other aggregation function
+ List<Integer> firstStageColumnIds = new ArrayList<Integer>();
+ columnIdIndex = 0;
+ List<Target> firstTargets = new ArrayList<Target>();
+ for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+ if (firstStageGroupbyNode.isDistinct()) {
+ for (Column column : firstStageGroupbyNode.getGroupingColumns()) {
+ Target firstTarget = new Target(new FieldEval(column));
+ if (!firstTargets.contains(firstTarget)) {
+ firstTargets.add(firstTarget);
+ firstStageColumnIds.add(columnIdIndex);
+ }
+ columnIdIndex++;
+ }
+ } else {
+ //add aggr function target
+ columnIdIndex += firstStageGroupbyNode.getGroupingColumns().length;
+ Target[] baseGroupbyTargets = firstStageGroupbyNode.getTargets();
+ for (int i = firstStageGroupbyNode.getGroupingColumns().length;
+ i < baseGroupbyTargets.length; i++) {
+ firstTargets.add(baseGroupbyTargets[i]);
+ firstStageColumnIds.add(columnIdIndex++);
+ }
+ }
+ }
+ firstStageDistinctNode.setTargets(firstTargets.toArray(new Target[]{}));
+ firstStageDistinctNode.setResultColumnIds(TUtil.toArray(firstStageColumnIds));
+
+ //Set SecondStage ColumnId and Input schema
+ secondStageDistinctNode.setResultColumnIds(secondStageColumnIds);
+
+ Schema secondStageInSchema = new Schema();
+ //TODO merged tuple schema
+ int index = 0;
+ for(GroupbyNode eachNode: secondStageDistinctNode.getGroupByNodes()) {
+ eachNode.setInSchema(firstStageDistinctNode.getOutSchema());
+ for (Column column: eachNode.getOutSchema().getColumns()) {
+ if (secondStageInSchema.getColumn(column) == null) {
+ secondStageInSchema.addColumn(column);
+ }
+ }
+ }
+ secondStageDistinctNode.setInSchema(secondStageInSchema);
+
+ return new DistinctGroupbyNode[]{firstStageDistinctNode, secondStageDistinctNode};
+ }
+
+ private void setDistinctAggregationEnforcer(
+ ExecutionBlock firstStageBlock, DistinctGroupbyNode firstStageDistinctNode,
+ ExecutionBlock secondStageBlock, DistinctGroupbyNode secondStageDistinctNode) {
+ firstStageBlock.getEnforcer().enforceDistinctAggregation(firstStageDistinctNode.getPID(),
+ DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+ List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>();
+ int index = 0;
+ for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+ List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>();
+ for (Column column: groupbyNode.getGroupingColumns()) {
+ sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build());
+ }
+ sortSpecArrays.add( SortSpecArray.newBuilder()
+ .setPid(secondStageDistinctNode.getGroupByNodes().get(index).getPID())
+ .addAllSortSpecs(sortSpecs).build());
+ }
+ secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(),
+ DistinctAggregationAlgorithm.SORT_AGGREGATION, sortSpecArrays);
+
+ }
+
+ static class DistinctGroupbyNodeBuildInfo {
+ private GroupbyNode groupbyNode;
+ private List<AggregationFunctionCallEval> aggFunctions = new ArrayList<AggregationFunctionCallEval>();
+ private List<Target> aggFunctionTargets = new ArrayList<Target>();
+
+ public DistinctGroupbyNodeBuildInfo(GroupbyNode groupbyNode) {
+ this.groupbyNode = groupbyNode;
+ }
+
+ public GroupbyNode getGroupbyNode() {
+ return groupbyNode;
+ }
+
+ public List<AggregationFunctionCallEval> getAggFunctions() {
+ return aggFunctions;
+ }
+
+ public List<Target> getAggFunctionTargets() {
+ return aggFunctionTargets;
+ }
+
+ public void addAggFunction(AggregationFunctionCallEval aggFunction) {
+ this.aggFunctions.add(aggFunction);
+ }
+
+ public void addAggFunctionTarget(Target target) {
+ this.aggFunctionTargets.add(target);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
new file mode 100644
index 0000000..b1e4bc3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DistinctGroupbyNode extends UnaryNode implements Projectable, Cloneable {
+ @Expose
+ private List<GroupbyNode> groupByNodes;
+
+ @Expose
+ private Target[] targets;
+
+ @Expose
+ private Column[] groupingColumns;
+
+ @Expose
+ private int[] resultColumnIds;
+
+ public DistinctGroupbyNode(int pid) {
+ super(pid, NodeType.DISTINCT_GROUP_BY);
+ }
+
+ @Override
+ public boolean hasTargets() {
+ return targets != null && targets.length > 0;
+ }
+
+ @Override
+ public void setTargets(Target[] targets) {
+ this.targets = targets;
+ setOutSchema(PlannerUtil.targetToSchema(targets));
+ }
+
+ @Override
+ public Target[] getTargets() {
+ return new Target[0];
+ }
+
+ public void setGroupbyNodes(List<GroupbyNode> groupByNodes) {
+ this.groupByNodes = groupByNodes;
+ }
+
+ public List<GroupbyNode> getGroupByNodes() {
+ return groupByNodes;
+ }
+
+ public final Column[] getGroupingColumns() {
+ return groupingColumns;
+ }
+
+ public final void setGroupColumns(Column[] groupingColumns) {
+ this.groupingColumns = groupingColumns;
+ }
+
+ public int[] getResultColumnIds() {
+ return resultColumnIds;
+ }
+
+ public void setResultColumnIds(int[] resultColumnIds) {
+ this.resultColumnIds = resultColumnIds;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ DistinctGroupbyNode cloneNode = (DistinctGroupbyNode)super.clone();
+
+ if (groupingColumns != null) {
+ cloneNode.groupingColumns = new Column[groupingColumns.length];
+ for (int i = 0; i < groupingColumns.length; i++) {
+ cloneNode.groupingColumns[i] = groupingColumns[i];
+ }
+ }
+
+ if (groupByNodes != null) {
+ cloneNode.groupByNodes = new ArrayList<GroupbyNode>();
+ for (GroupbyNode eachNode: groupByNodes) {
+ GroupbyNode groupbyNode = (GroupbyNode)eachNode.clone();
+ groupbyNode.setPID(-1);
+ cloneNode.groupByNodes.add(groupbyNode);
+ }
+ }
+
+ if (targets != null) {
+ cloneNode.targets = new Target[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ cloneNode.targets[i] = (Target) targets[i].clone();
+ }
+ }
+
+ return cloneNode;
+ }
+
+ public final boolean isEmptyGrouping() {
+ return groupingColumns == null || groupingColumns.length == 0;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Distinct GroupBy (");
+ if (groupingColumns != null || groupingColumns.length > 0) {
+ sb.append("grouping set=").append(TUtil.arrayToString(groupingColumns));
+ sb.append(", ");
+ }
+ for (GroupbyNode eachNode: groupByNodes) {
+ sb.append(", groupbyNode=").append(eachNode.toString());
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof DistinctGroupbyNode) {
+ DistinctGroupbyNode other = (DistinctGroupbyNode) obj;
+ boolean eq = super.equals(other);
+ eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
+ eq = eq && TUtil.checkEquals(groupByNodes, other.groupByNodes);
+ eq = eq && TUtil.checkEquals(targets, other.targets);
+ return eq;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString(this);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("(");
+ Column [] groupingColumns = this.groupingColumns;
+ for (int j = 0; j < groupingColumns.length; j++) {
+ sb.append(groupingColumns[j].getSimpleName());
+ if(j < groupingColumns.length - 1) {
+ sb.append(",");
+ }
+ }
+
+ sb.append(")");
+
+ planStr.appendTitle(sb.toString());
+
+ sb = new StringBuilder();
+ sb.append("(");
+
+ String prefix = "";
+ for (GroupbyNode eachNode: groupByNodes) {
+ if (eachNode.hasAggFunctions()) {
+ AggregationFunctionCallEval[] aggrFunctions = eachNode.getAggFunctions();
+ for (int j = 0; j < aggrFunctions.length; j++) {
+ sb.append(prefix).append(aggrFunctions[j]);
+ prefix = ",";
+ }
+ }
+ }
+ sb.append(")");
+ planStr.appendExplain("exprs: ").appendExplain(sb.toString());
+
+ sb = new StringBuilder("target list: ");
+ for (int i = 0; i < targets.length; i++) {
+ sb.append(targets[i]);
+ if( i < targets.length - 1) {
+ sb.append(", ");
+ }
+ }
+ planStr.addExplan(sb.toString());
+
+ planStr.addDetail("out schema:").appendDetail(getOutSchema().toString());
+ planStr.addDetail("in schema:").appendDetail(getInSchema().toString());
+
+ for (GroupbyNode eachNode: groupByNodes) {
+ planStr.addDetail("\t").appendDetail("distinct: " + eachNode.isDistinct())
+ .appendDetail(", " + eachNode.getShortPlanString());
+ }
+
+ return planStr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index bafe0c6..828b06d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -148,6 +148,48 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
return grp;
}
+ public String getShortPlanString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getType().name() + "(" + getPID() + ")").append("(");
+ Column [] groupingColumns = this.groupingColumns;
+ for (int j = 0; j < groupingColumns.length; j++) {
+ sb.append(groupingColumns[j].getSimpleName());
+ if(j < groupingColumns.length - 1) {
+ sb.append(",");
+ }
+ }
+
+ sb.append(")");
+
+ // there can be no aggregation functions
+ if (hasAggFunctions()) {
+ sb.append(", exprs: (");
+
+ for (int j = 0; j < aggrFunctions.length; j++) {
+ sb.append(aggrFunctions[j]);
+ if(j < aggrFunctions.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ }
+
+ if (targets != null) {
+ sb.append(", target list:{");
+ for (int i = 0; i < targets.length; i++) {
+ sb.append(targets[i]);
+ if (i < targets.length - 1) {
+ sb.append(", ");
+ }
+ }
+ sb.append("}");
+ }
+ sb.append(", out schema:").append(getOutSchema().toString());
+ sb.append(", in schema:").append(getInSchema().toString());
+
+ return sb.toString();
+ }
+
@Override
public PlanString getPlanString() {
PlanString planStr = new PlanString(this);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index f498231..cc43912 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -46,6 +46,7 @@ public enum NodeType {
BST_INDEX_SCAN(IndexScanNode.class),
STORE(StoreTableNode.class),
INSERT(InsertNode.class),
+ DISTINCT_GROUP_BY(DistinctGroupbyNode.class),
CREATE_DATABASE(CreateDatabaseNode.class),
DROP_DATABASE(DropDatabaseNode.class),
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
new file mode 100644
index 0000000..6458f47
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+public class DistinctGroupbyHashAggregationExec extends PhysicalExec {
+ private DistinctGroupbyNode plan;
+ private boolean finished = false;
+
+ private HashAggregator[] hashAggregators;
+ private PhysicalExec child;
+ private int distinctGroupingKeyNum;
+ private int distinctGroupingKeyIds[];
+ private boolean first = true;
+ private int groupbyNodeNum;
+ private int outputColumnNum;
+ private int totalNumRows;
+ private int fetchedRows;
+ private float progress;
+
+ private int[] resultColumnIdIndexes;
+
+ public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp)
+ throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+
+ this.child = subOp;
+ this.child.init();
+
+ distinctGroupingKeyNum = plan.getGroupingColumns().length;
+ distinctGroupingKeyIds = new int[distinctGroupingKeyNum];
+
+ Column[] keyColumns = plan.getGroupingColumns();
+ Column col;
+ for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
+ col = keyColumns[idx];
+ distinctGroupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
+ }
+
+ List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+ groupbyNodeNum = groupbyNodes.size();
+ this.hashAggregators = new HashAggregator[groupbyNodeNum];
+
+ int index = 0;
+ for (GroupbyNode eachGroupby: groupbyNodes) {
+ hashAggregators[index++] = new HashAggregator(eachGroupby);
+ }
+
+ outputColumnNum = plan.getOutSchema().size();
+
+ int allGroupbyOutColNum = 0;
+ for (GroupbyNode eachGroupby: plan.getGroupByNodes()) {
+ allGroupbyOutColNum += eachGroupby.getOutSchema().size();
+ }
+
+ resultColumnIdIndexes = new int[allGroupbyOutColNum];
+ for (int i = 0; i < allGroupbyOutColNum; i++) {
+ resultColumnIdIndexes[i] = -1;
+ }
+
+ int[] resultColumnIds = plan.getResultColumnIds();
+ for(int i = 0; i < resultColumnIds.length; i++) {
+ resultColumnIdIndexes[resultColumnIds[i]] = i;
+ }
+ }
+
+ List<Tuple> currentAggregatedTuples = null;
+ int currentAggregatedTupleIndex = 0;
+ int currentAggregatedTupleSize = 0;
+
+ @Override
+ public Tuple next() throws IOException {
+ if (finished) {
+ return null;
+ }
+ if (first) {
+ loadChildHashTable();
+ progress = 0.5f;
+ first = false;
+ }
+
+ if (currentAggregatedTuples != null && currentAggregatedTupleIndex < currentAggregatedTupleSize) {
+ return currentAggregatedTuples.get(currentAggregatedTupleIndex++);
+ }
+
+ Tuple distinctGroupingKey = null;
+ int nullCount = 0;
+
+ //--------------------------------------------------------------------------------------
+ // Output tuple
+ //--------------------------------------------------------------------------------------
+ // hashAggregators[0] hashAggregators[1] hashAggregators[2]
+ //--------------------------------------------------------------------------------------
+ // Groupby_Key1 | Distinct1_Column_V1 | Distinct2_Column_Va | Other_Aggregation_Result |
+ // Groupby_Key1 | Distinct1_Column_V2 | Distinct2_Column_Vb | |
+ // Groupby_Key1 | | Distinct2_Column_Vc | |
+ // Groupby_Key1 | | Distinct2_Column_Vd | |
+ //--------------------------------------------------------------------------------------
+ // Groupby_Key2 | Distinct1_Column_V1 | Distinct2_Column_Vk | Other_Aggregation_Result |
+ // Groupby_Key2 | Distinct1_Column_V2 | Distinct2_Column_Vn | |
+ // Groupby_Key2 | Distinct1_Column_V3 | | |
+ //--------------------------------------------------------------------------------------
+
+ List<List<Tuple>> tupleSlots = new ArrayList<List<Tuple>>();
+ for (int i = 0; i < hashAggregators.length; i++) {
+ if (!hashAggregators[i].iterator.hasNext()) {
+ nullCount++;
+ continue;
+ }
+ Entry<Tuple, Map<Tuple, FunctionContext[]>> entry = hashAggregators[i].iterator.next();
+ if (distinctGroupingKey == null) {
+ distinctGroupingKey = entry.getKey();
+ }
+ List<Tuple> aggregatedTuples = hashAggregators[i].aggregate(entry.getValue());
+ tupleSlots.add(aggregatedTuples);
+ }
+
+ if (nullCount == hashAggregators.length) {
+ finished = true;
+ progress = 1.0f;
+ return null;
+ }
+
+ currentAggregatedTuples = new ArrayList<Tuple>();
+ int listIndex = 0;
+ while (true) {
+ Tuple[] tuples = new Tuple[hashAggregators.length];
+ for (int i = 0; i < hashAggregators.length; i++) {
+ List<Tuple> aggregatedTuples = tupleSlots.get(i);
+ if (aggregatedTuples.size() > listIndex) {
+ tuples[i] = tupleSlots.get(i).get(listIndex);
+ }
+ }
+
+ //merge
+ Tuple mergedTuple = new VTuple(outputColumnNum);
+ int mergeTupleIndex = 0;
+
+ boolean allNull = true;
+ for (int i = 0; i < hashAggregators.length; i++) {
+ if (tuples[i] != null) {
+ allNull = false;
+ }
+
+ int tupleSize = hashAggregators[i].getTupleSize();
+ for (int j = 0; j < tupleSize; j++) {
+ if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
+ if (tuples[i] != null) {
+ mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], tuples[i].get(j));
+ } else {
+ mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], NullDatum.get());
+ }
+ }
+ mergeTupleIndex++;
+ }
+ }
+
+ if (allNull) {
+ break;
+ }
+
+ currentAggregatedTuples.add(mergedTuple);
+ listIndex++;
+ }
+
+ currentAggregatedTupleIndex = 0;
+ currentAggregatedTupleSize = currentAggregatedTuples.size();
+
+ if (currentAggregatedTupleSize == 0) {
+ finished = true;
+ progress = 1.0f;
+ return null;
+ }
+
+ fetchedRows++;
+ Tuple tuple = currentAggregatedTuples.get(currentAggregatedTupleIndex++);
+
+ return tuple;
+ }
+
+ private void loadChildHashTable() throws IOException {
+ Tuple tuple = null;
+ while(!context.isStopped() && (tuple = child.next()) != null) {
+ for (int i = 0; i < hashAggregators.length; i++) {
+ hashAggregators[i].compute(tuple);
+ }
+ }
+ for (int i = 0; i < hashAggregators.length; i++) {
+ hashAggregators[i].initFetch();
+ }
+
+ totalNumRows = hashAggregators[0].hashTable.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ plan = null;
+ if (hashAggregators != null) {
+ for (int i = 0; i < hashAggregators.length; i++) {
+ hashAggregators[i].close();
+ }
+ }
+ if (child != null) {
+ child.close();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ }
+
+ public void rescan() throws IOException {
+ finished = false;
+ for (int i = 0; i < hashAggregators.length; i++) {
+ hashAggregators[i].initFetch();
+ }
+ }
+
+ public float getProgress() {
+ if (finished) {
+ return progress;
+ } else {
+ if (totalNumRows > 0) {
+ return progress + ((float)fetchedRows / (float)totalNumRows) * 0.5f;
+ } else {
+ return progress;
+ }
+ }
+ }
+
+ public TableStats getInputStats() {
+ if (child != null) {
+ return child.getInputStats();
+ } else {
+ return null;
+ }
+ }
+
+ class HashAggregator {
+ // Outer's GroupBy Key -> Each GroupByNode's Key -> FunctionContext
+ private Map<Tuple, Map<Tuple, FunctionContext[]>> hashTable;
+ private Iterator<Entry<Tuple, Map<Tuple, FunctionContext[]>>> iterator = null;
+
+ private int groupingKeyIds[];
+ private final int aggFunctionsNum;
+ private final AggregationFunctionCallEval aggFunctions[];
+
+ private Schema evalSchema;
+
+ private GroupbyNode groupbyNode;
+
+ int tupleSize;
+
+ public HashAggregator(GroupbyNode groupbyNode) throws IOException {
+ this.groupbyNode = groupbyNode;
+
+ hashTable = new HashMap<Tuple, Map<Tuple, FunctionContext[]>>(10000);
+ evalSchema = groupbyNode.getOutSchema();
+
+ List<Integer> distinctGroupingKeyIdSet = new ArrayList<Integer>();
+ for (int i = 0; i < distinctGroupingKeyIds.length; i++) {
+ distinctGroupingKeyIdSet.add(distinctGroupingKeyIds[i]);
+ }
+
+ List<Integer> groupingKeyIdList = new ArrayList<Integer>(distinctGroupingKeyIdSet);
+ Column[] keyColumns = groupbyNode.getGroupingColumns();
+ Column col;
+ for (int idx = 0; idx < groupbyNode.getGroupingColumns().length; idx++) {
+ col = keyColumns[idx];
+ int keyIndex = inSchema.getColumnId(col.getQualifiedName());
+ if (!distinctGroupingKeyIdSet.contains(keyIndex)) {
+ groupingKeyIdList.add(keyIndex);
+ }
+ }
+ int index = 0;
+ groupingKeyIds = new int[groupingKeyIdList.size()];
+ for (Integer eachId : groupingKeyIdList) {
+ groupingKeyIds[index++] = eachId;
+ }
+
+ if (groupbyNode.hasAggFunctions()) {
+ aggFunctions = groupbyNode.getAggFunctions();
+ aggFunctionsNum = aggFunctions.length;
+ } else {
+ aggFunctions = new AggregationFunctionCallEval[0];
+ aggFunctionsNum = 0;
+ }
+
+ tupleSize = groupingKeyIds.length + aggFunctionsNum;
+ }
+
+ public int getTupleSize() {
+ return tupleSize;
+ }
+
+ public void compute(Tuple tuple) throws IOException {
+ Tuple outerKeyTuple = new VTuple(distinctGroupingKeyIds.length);
+ for (int i = 0; i < distinctGroupingKeyIds.length; i++) {
+ outerKeyTuple.put(i, tuple.get(distinctGroupingKeyIds[i]));
+ }
+
+ Tuple keyTuple = new VTuple(groupingKeyIds.length);
+ for (int i = 0; i < groupingKeyIds.length; i++) {
+ keyTuple.put(i, tuple.get(groupingKeyIds[i]));
+ }
+
+ Map<Tuple, FunctionContext[]> distinctEntry = hashTable.get(outerKeyTuple);
+ if (distinctEntry == null) {
+ distinctEntry = new HashMap<Tuple, FunctionContext[]>();
+ hashTable.put(outerKeyTuple, distinctEntry);
+ }
+ FunctionContext[] contexts = distinctEntry.get(keyTuple);
+ if (contexts != null) {
+ for (int i = 0; i < aggFunctions.length; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ } else { // if the key occurs firstly
+ contexts = new FunctionContext[aggFunctionsNum];
+ for (int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ distinctEntry.put(keyTuple, contexts);
+ }
+ }
+
+ public void initFetch() {
+ iterator = hashTable.entrySet().iterator();
+ }
+
+ public List<Tuple> aggregate(Map<Tuple, FunctionContext[]> groupTuples) {
+ List<Tuple> aggregatedTuples = new ArrayList<Tuple>();
+
+ for (Entry<Tuple, FunctionContext[]> entry : groupTuples.entrySet()) {
+ Tuple tuple = new VTuple(groupingKeyIds.length + aggFunctionsNum);
+ Tuple groupbyKey = entry.getKey();
+ int index = 0;
+ for (; index < groupbyKey.size(); index++) {
+ tuple.put(index, groupbyKey.get(index));
+ }
+
+ FunctionContext[] contexts = entry.getValue();
+ for (int i = 0; i < aggFunctionsNum; i++, index++) {
+ tuple.put(index, aggFunctions[i].terminate(contexts[i]));
+ }
+ aggregatedTuples.add(tuple);
+ }
+ return aggregatedTuples;
+ }
+
+ public void close() throws IOException {
+ hashTable.clear();
+ hashTable = null;
+ iterator = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
new file mode 100644
index 0000000..c8457ac
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class DistinctGroupbySortAggregationExec extends PhysicalExec {
+ private DistinctGroupbyNode plan;
+ private SortAggregateExec[] aggregateExecs;
+
+ private boolean finished = false;
+
+ private int distinctGroupingKeyNum;
+
+ private Tuple[] currentTuples;
+ private int outColumnNum;
+ private int groupbyNodeNum;
+
+ private int[] resultColumnIdIndexes;
+
+ public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, DistinctGroupbyNode plan,
+ SortAggregateExec[] aggregateExecs) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+ this.plan = plan;
+ this.aggregateExecs = aggregateExecs;
+ this.groupbyNodeNum = plan.getGroupByNodes().size();
+
+ final Column[] keyColumns = plan.getGroupingColumns();
+ distinctGroupingKeyNum = keyColumns.length;
+
+ currentTuples = new Tuple[groupbyNodeNum];
+ outColumnNum = outSchema.size();
+
+ int allGroupbyOutColNum = 0;
+ for (GroupbyNode eachGroupby: plan.getGroupByNodes()) {
+ allGroupbyOutColNum += eachGroupby.getOutSchema().size();
+ }
+
+ resultColumnIdIndexes = new int[allGroupbyOutColNum];
+ for (int i = 0; i < allGroupbyOutColNum; i++) {
+ resultColumnIdIndexes[i] = -1;
+ }
+
+ int[] resultColumnIds = plan.getResultColumnIds();
+
+ for(int i = 0; i < resultColumnIds.length; i++) {
+ resultColumnIdIndexes[resultColumnIds[i]] = i;
+ }
+
+ for (SortAggregateExec eachExec: aggregateExecs) {
+ eachExec.init();
+ }
+ }
+
+ boolean first = true;
+
+ @Override
+ public Tuple next() throws IOException {
+ if (finished) {
+ return null;
+ }
+
+ boolean allNull = true;
+ for (int i = 0; i < groupbyNodeNum; i++) {
+ if (first && i > 0) {
+ // All SortAggregateExec uses same SeqScanExec object.
+ // After running sort, rescan() should be called.
+ aggregateExecs[i].rescan();
+ }
+ currentTuples[i] = aggregateExecs[i].next();
+
+ if (currentTuples[i] != null) {
+ allNull = false;
+ }
+ }
+ first = false;
+
+ if (allNull) {
+ finished = true;
+ return null;
+ }
+
+ Tuple mergedTuple = new VTuple(outColumnNum);
+
+ int mergeTupleIndex = 0;
+ for (int i = 0; i < currentTuples.length; i++) {
+ int tupleSize = currentTuples[i].size();
+ for (int j = 0; j < tupleSize; j++) {
+ if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
+ mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].get(j));
+ }
+ mergeTupleIndex++;
+ }
+ }
+
+ return mergedTuple;
+ }
+
+ @Override
+ public void close() throws IOException {
+ plan = null;
+ if (aggregateExecs != null) {
+ for (SortAggregateExec eachExec: aggregateExecs) {
+ eachExec.close();
+ }
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ finished = false;
+ for (int i = 0; i < groupbyNodeNum; i++) {
+ aggregateExecs[i].rescan();
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ if (finished) {
+ return 1.0f;
+ } else {
+ return aggregateExecs[aggregateExecs.length - 1].getProgress();
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return aggregateExecs[aggregateExecs.length - 1].getInputStats();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index c422b49..f714758 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -758,8 +758,9 @@ public class ExternalSortExec extends SortExec {
public void rescan() throws IOException {
if (result != null) {
result.reset();
- progress = 0.5f;
}
+ super.rescan();
+ progress = 0.5f;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index a31ad90..c87e01a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -54,14 +54,14 @@ public class HashAggregateExec extends AggregationExec {
for(int i = 0; i < groupingKeyIds.length; i++) {
keyTuple.put(i, tuple.get(groupingKeyIds[i]));
}
-
- if(hashTable.containsKey(keyTuple)) {
- FunctionContext [] contexts = hashTable.get(keyTuple);
+
+ FunctionContext [] contexts = hashTable.get(keyTuple);
+ if(contexts != null) {
for(int i = 0; i < aggFunctions.length; i++) {
aggFunctions[i].merge(contexts[i], inSchema, tuple);
}
} else { // if the key occurs firstly
- FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+ contexts = new FunctionContext[aggFunctionsNum];
for(int i = 0; i < aggFunctionsNum; i++) {
contexts[i] = aggFunctions[i].newContext();
aggFunctions[i].merge(contexts[i], inSchema, tuple);