You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/24 07:58:13 UTC
[2/2] git commit: TAJO-197: Implement Enforcer that forces physical
planner to choose specified algorithms. (hyunsik)
TAJO-197: Implement Enforcer that forces physical planner to choose specified algorithms. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/17287ef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/17287ef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/17287ef5
Branch: refs/heads/master
Commit: 17287ef58457da7766a78d2655cd332adbc56150
Parents: 3e82159
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Sep 24 14:57:54 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Sep 24 14:57:54 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/tajo/catalog/SortSpec.java | 25 +-
.../src/main/proto/CatalogProtos.proto | 8 +-
.../org/apache/tajo/TaskAttemptContext.java | 10 +
.../engine/planner/PhysicalPlannerImpl.java | 312 +++++++--
.../planner/PhysicalPlanningException.java | 31 +
.../apache/tajo/engine/planner/PlannerUtil.java | 4 +-
.../tajo/engine/planner/enforce/Enforcer.java | 166 +++++
.../engine/planner/physical/BNLJoinExec.java | 10 +-
.../planner/physical/ExternalSortExec.java | 3 +-
.../engine/planner/physical/MemSortExec.java | 3 +-
.../planner/physical/UnaryPhysicalExec.java | 4 +-
.../tajo/engine/query/QueryUnitRequestImpl.java | 31 +-
.../ipc/protocolrecords/QueryUnitRequest.java | 2 +
.../org/apache/tajo/master/ExecutionBlock.java | 10 +-
.../apache/tajo/master/TaskSchedulerImpl.java | 5 +-
.../main/java/org/apache/tajo/worker/Task.java | 1 +
.../src/main/proto/TajoWorkerProtocol.proto | 73 +-
.../planner/physical/TestBNLJoinExec.java | 67 +-
.../planner/physical/TestExternalSortExec.java | 2 +
.../planner/physical/TestHashAntiJoinExec.java | 2 +
.../planner/physical/TestHashJoinExec.java | 38 +-
.../planner/physical/TestHashSemiJoinExec.java | 2 +
.../planner/physical/TestMergeJoinExec.java | 71 +-
.../engine/planner/physical/TestNLJoinExec.java | 3 +
.../planner/physical/TestPhysicalPlanner.java | 131 +++-
.../engine/planner/physical/TestSortExec.java | 5 +-
.../apache/tajo/engine/query/TestSortQuery.java | 1 +
.../tajo/worker/TestRangeRetrieverHandler.java | 9 +-
.../apache/tajo/storage/TupleComparator.java | 27 +-
.../apache/tajo/storage/index/IndexProtos.java | 665 -------------------
.../apache/tajo/storage/index/bst/BSTIndex.java | 3 +-
.../src/main/proto/IndexProtos.proto | 2 +-
33 files changed, 812 insertions(+), 917 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 53a6c07..9400154 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,9 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-197: Implement Enforcer that forces physical planner to choose
+ specified algorithms. (hyunsik)
+
TAJO-194: LogicalNode should have an identifier to distinguish each
logical node instance. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
index be73eb3..5cc0de1 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
@@ -19,11 +19,14 @@
package org.apache.tajo.catalog;
import com.google.gson.annotations.Expose;
-import org.apache.tajo.json.GsonObject;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
+import static org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
-public class SortSpec implements Cloneable, GsonObject {
+
+public class SortSpec implements Cloneable, GsonObject, ProtoObject<SortSpecProto> {
@Expose private Column sortKey;
@Expose private boolean ascending = true;
@Expose private boolean nullFirst = false;
@@ -45,6 +48,12 @@ public class SortSpec implements Cloneable, GsonObject {
this.nullFirst = nullFirst;
}
+ public SortSpec(SortSpecProto sortSpec) {
+ this.sortKey = new Column(sortSpec.getColumn());
+ this.ascending = sortSpec.getAscending();
+ this.nullFirst = sortSpec.getNullFirst();
+ }
+
public final boolean isAscending() {
return this.ascending;
}
@@ -93,7 +102,15 @@ public class SortSpec implements Cloneable, GsonObject {
}
public String toString() {
- return "Sortkey (key="+sortKey
- + " "+(ascending ? "asc" : "desc")+")";
+ return "Sortkey (key="+sortKey + " "+(ascending ? "asc" : "desc")+")";
+ }
+
+ @Override
+ public SortSpecProto getProto() {
+ SortSpecProto.Builder builder = SortSpecProto.newBuilder();
+ builder.setColumn(sortKey.getProto());
+ builder.setAscending(ascending);
+ builder.setNullFirst(nullFirst);
+ return builder.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 477362c..6ef7613 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -207,8 +207,14 @@ message ServerNameProto {
required int32 port = 3;
}
+message TupleComparatorSpecProto {
+ required int32 columnId = 1;
+ optional bool ascending = 2 [default = true];
+ optional bool nullFirst = 3 [default = false];
+}
+
message SortSpecProto {
- required int32 sortColumnId = 1;
+ required ColumnProto column = 1;
optional bool ascending = 2 [default = true];
optional bool nullFirst = 3 [default = false];
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
index 7eac43d..32c06cc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.storage.Fragment;
import java.io.File;
@@ -55,6 +56,7 @@ public class TaskAttemptContext {
private boolean interQuery = false;
private Path outputPath;
private DataChannel dataChannel;
+ private Enforcer enforcer;
public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
final Fragment[] fragments,
@@ -99,6 +101,14 @@ public class TaskAttemptContext {
return dataChannel;
}
+ public void setEnforcer(Enforcer enforcer) {
+ this.enforcer = enforcer;
+ }
+
+ public Enforcer getEnforcer() {
+ return this.enforcer;
+ }
+
public boolean hasResultStats() {
return resultStats != null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 4813fd7..ddedd80 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -31,6 +31,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
@@ -40,8 +41,12 @@ import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.IndexUtil;
import java.io.IOException;
+import java.util.List;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
@@ -63,7 +68,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
try {
execPlan = createPlanRecursive(context, logicalPlan);
- if (execPlan instanceof StoreTableExec || execPlan instanceof IndexedStoreExec
+ if (execPlan instanceof StoreTableExec
+ || execPlan instanceof IndexedStoreExec
|| execPlan instanceof PartitionedStoreExec) {
return execPlan;
} else if (context.getDataChannel() != null) {
@@ -181,60 +187,142 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return size;
}
- public PhysicalExec createJoinPlan(TaskAttemptContext ctx, JoinNode joinNode,
- PhysicalExec outer, PhysicalExec inner)
- throws IOException {
+ public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
+ PhysicalExec rightExec) throws IOException {
+
switch (joinNode.getJoinType()) {
case CROSS:
LOG.info("The planner chooses [Nested Loop Join]");
- return new NLJoinExec(ctx, joinNode, outer, inner);
+ return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
case INNER:
- String [] outerLineage = PlannerUtil.getLineage(joinNode.getLeftChild());
- String [] innerLineage = PlannerUtil.getLineage(joinNode.getRightChild());
- long outerSize = estimateSizeRecursive(ctx, outerLineage);
- long innerSize = estimateSizeRecursive(ctx, innerLineage);
+ return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
- final long threshold = 1048576 * 128; // 64MB
+ case FULL_OUTER:
+ case LEFT_OUTER:
+ case RIGHT_OUTER:
- boolean hashJoin = false;
- if (outerSize < threshold || innerSize < threshold) {
- hashJoin = true;
- }
+ case LEFT_SEMI:
+ case RIGHT_SEMI:
- if (hashJoin) {
- PhysicalExec selectedOuter;
- PhysicalExec selectedInner;
+ case LEFT_ANTI:
+ case RIGHT_ANTI:
- // HashJoinExec loads the inner relation to memory.
- if (outerSize <= innerSize) {
- selectedInner = outer;
- selectedOuter = inner;
- } else {
- selectedInner = inner;
- selectedOuter = outer;
- }
+ default:
+ throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
+ }
+ }
- LOG.info("The planner chooses [InMemory Hash Join]");
- return new HashJoinExec(ctx, joinNode, selectedOuter, selectedInner);
- }
+ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+ switch (algorithm) {
+ case NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, plan, leftExec, rightExec);
+ case BLOCK_NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
+ default:
+ // fallback algorithm
+ LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
+ }
- default:
- SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
- joinNode.getJoinQual(), outer.getSchema(), inner.getSchema());
- ExternalSortExec outerSort = new ExternalSortExec(ctx, sm,
- new SortNode(UNGENERATED_PID, sortSpecs[0], outer.getSchema(), outer.getSchema()),
- outer);
- ExternalSortExec innerSort = new ExternalSortExec(ctx, sm,
- new SortNode(UNGENERATED_PID, sortSpecs[1], inner.getSchema(), inner.getSchema()),
- inner);
-
- LOG.info("The planner chooses [Merge Join]");
- return new MergeJoinExec(ctx, joinNode, outerSort, innerSort,
- sortSpecs[0], sortSpecs[1]);
+ } else {
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
}
}
+ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+ switch (algorithm) {
+ case NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, plan, leftExec, rightExec);
+ case BLOCK_NESTED_LOOP_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, plan, leftExec, rightExec);
+ case IN_MEMORY_HASH_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+ return new HashJoinExec(context, plan, leftExec, rightExec);
+ case MERGE_JOIN:
+ LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
+ return createMergeJoin(context, plan, leftExec, rightExec);
+ case HYBRID_HASH_JOIN:
+
+ default:
+ LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+ return createMergeJoin(context, plan, leftExec, rightExec);
+ }
+
+
+ } else {
+ return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+ String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ long leftSize = estimateSizeRecursive(context, leftLineage);
+ long rightSize = estimateSizeRecursive(context, rightLineage);
+
+ final long threshold = 1048576 * 128; // 64MB
+
+ boolean hashJoin = false;
+ if (leftSize < threshold || rightSize < threshold) {
+ hashJoin = true;
+ }
+
+ if (hashJoin) {
+ PhysicalExec selectedOuter;
+ PhysicalExec selectedInner;
+
+ // HashJoinExec loads the inner relation to memory.
+ if (leftSize <= rightSize) {
+ selectedInner = leftExec;
+ selectedOuter = rightExec;
+ } else {
+ selectedInner = rightExec;
+ selectedOuter = leftExec;
+ }
+
+ LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
+ return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+ } else {
+ return createMergeJoin(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private MergeJoinExec createMergeJoin(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+ plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+ ExternalSortExec outerSort = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
+ leftExec);
+ ExternalSortExec innerSort = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
+ rightExec);
+
+ LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
+ return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+ }
+
public PhysicalExec createStorePlan(TaskAttemptContext ctx,
StoreTableNode plan, PhysicalExec subOp) throws IOException {
if (plan.getPartitionType() == PartitionType.HASH_PARTITION
@@ -277,42 +365,94 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new SeqScanExec(ctx, sm, scanNode, fragments);
}
- public PhysicalExec createGroupByPlan(TaskAttemptContext ctx,
- GroupbyNode groupbyNode, PhysicalExec subOp) throws IOException {
+ public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
+ throws IOException {
+
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, groupbyNode);
+ if (property != null) {
+ GroupbyAlgorithm algorithm = property.getGroupby().getAlgorithm();
+ if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) {
+ return createInMemoryHashAggregation(context, groupbyNode, subOp);
+ } else {
+ return createSortAggregation(context, groupbyNode, subOp);
+ }
+ }
+ return createBestAggregationPlan(context, groupbyNode, subOp);
+ }
+
+ private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
+ throws IOException {
+ LOG.info("The planner chooses [Hash Aggregation]");
+ return new HashAggregateExec(ctx, groupbyNode, subOp);
+ }
+
+ private PhysicalExec createSortAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
+ throws IOException {
+ Column[] grpColumns = groupbyNode.getGroupingColumns();
+ SortSpec[] specs = new SortSpec[grpColumns.length];
+ for (int i = 0; i < grpColumns.length; i++) {
+ specs[i] = new SortSpec(grpColumns[i], true, false);
+ }
+ SortNode sortNode = new SortNode(-1, specs);
+ sortNode.setInSchema(subOp.getSchema());
+ sortNode.setOutSchema(subOp.getSchema());
+ // SortExec sortExec = new SortExec(sortNode, child);
+ ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
+ LOG.info("The planner chooses [Sort Aggregation]");
+ return new SortAggregateExec(ctx, groupbyNode, sortExec);
+ }
+
+ private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode,
+ PhysicalExec subOp) throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
if (grpColumns.length == 0) {
+ return createInMemoryHashAggregation(context, groupbyNode, subOp);
+ }
+
+ String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
+ long estimatedSize = estimateSizeRecursive(context, outerLineage);
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.HASH_AGGREGATION_THRESHOLD);
+
+ // if the relation size is less than the threshold,
+ // the hash aggregation will be used.
+ if (estimatedSize <= threshold) {
LOG.info("The planner chooses [Hash Aggregation]");
- return new HashAggregateExec(ctx, groupbyNode, subOp);
+ return createInMemoryHashAggregation(context, groupbyNode, subOp);
} else {
- String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
- long estimatedSize = estimateSizeRecursive(ctx, outerLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.HASH_AGGREGATION_THRESHOLD);
-
- // if the relation size is less than the threshold,
- // the hash aggregation will be used.
- if (estimatedSize <= threshold) {
- LOG.info("The planner chooses [Hash Aggregation]");
- return new HashAggregateExec(ctx, groupbyNode, subOp);
+ return createSortAggregation(context, groupbyNode, subOp);
+ }
+ }
+
+ public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
+ PhysicalExec child) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
+ if (property != null) {
+ SortEnforce.SortAlgorithm algorithm = property.getSort().getAlgorithm();
+ if (algorithm == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) {
+ return new MemSortExec(context, sortNode, child);
} else {
- SortSpec[] specs = new SortSpec[grpColumns.length];
- for (int i = 0; i < grpColumns.length; i++) {
- specs[i] = new SortSpec(grpColumns[i], true, false);
- }
- SortNode sortNode = new SortNode(UNGENERATED_PID, specs);
- sortNode.setInSchema(subOp.getSchema());
- sortNode.setOutSchema(subOp.getSchema());
- // SortExec sortExec = new SortExec(sortNode, child);
- ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode,
- subOp);
- LOG.info("The planner chooses [Sort Aggregation]");
- return new SortAggregateExec(ctx, groupbyNode, sortExec);
+ return new ExternalSortExec(context, sm, sortNode, child);
}
}
+
+ return createBestSortPlan(context, sortNode, child);
}
- public PhysicalExec createSortPlan(TaskAttemptContext ctx, SortNode sortNode,
- PhysicalExec subOp) throws IOException {
- return new ExternalSortExec(ctx, sm, sortNode, subOp);
+ public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
+ PhysicalExec child) throws IOException {
+ String [] outerLineage = PlannerUtil.getLineage(sortNode.getChild());
+ long estimatedSize = estimateSizeRecursive(context, outerLineage);
+ final long threshold = 1048576 * 2000;
+
+ // if the relation size is less than the reshold,
+ // the in-memory sort will be used.
+ if (estimatedSize <= threshold) {
+ return new MemSortExec(context, sortNode, child);
+ } else {
+ return new ExternalSortExec(context, sm, sortNode, child);
+ }
}
public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
@@ -335,4 +475,38 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
annotation.getDatum());
}
+
+ private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
+ if (enforcer == null) {
+ return null;
+ }
+
+ EnforceType type;
+ if (node.getType() == NodeType.JOIN) {
+ type = EnforceType.JOIN;
+ } else if (node.getType() == NodeType.GROUP_BY) {
+ type = EnforceType.GROUP_BY;
+ } else if (node.getType() == NodeType.SORT) {
+ type = EnforceType.SORT;
+ } else {
+ return null;
+ }
+
+ if (enforcer.hasEnforceProperty(type)) {
+ List<EnforceProperty> properties = enforcer.getEnforceProperties(type);
+ EnforceProperty found = null;
+ for (EnforceProperty property : properties) {
+ if (type == EnforceType.JOIN && property.getJoin().getPid() == node.getPID()) {
+ found = property;
+ } else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) {
+ found = property;
+ } else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) {
+ found = property;
+ }
+ }
+ return found;
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
new file mode 100644
index 0000000..1b0a7c3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import java.io.IOException;
+
+public class PhysicalPlanningException extends IOException {
+ public PhysicalPlanningException(String message) {
+ super(message);
+ }
+
+ public PhysicalPlanningException(Throwable t) {
+ super(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 7b39d26..4907c40 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -224,7 +224,7 @@ public class PlannerUtil {
* @param type to find
* @return the parent node of a found logical node
*/
- public static LogicalNode findTopParentNode(LogicalNode node, NodeType type) {
+ public static <T extends LogicalNode> T findTopParentNode(LogicalNode node, NodeType type) {
Preconditions.checkNotNull(node);
Preconditions.checkNotNull(type);
@@ -234,7 +234,7 @@ public class PlannerUtil {
if (finder.getFoundNodes().size() == 0) {
return null;
}
- return finder.getFoundNodes().get(0);
+ return (T) finder.getFoundNodes().get(0);
}
public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
new file mode 100644
index 0000000..a268a39
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.enforce;
+
+
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+
+public class Enforcer implements ProtoObject<EnforcerProto> {
+ Map<EnforceType, List<EnforceProperty>> properties;
+ private EnforcerProto proto;
+
+ @SuppressWarnings("unused")
+ public Enforcer() {
+ properties = TUtil.newHashMap();
+ }
+
+ public Enforcer(EnforcerProto proto) {
+ this.proto = proto;
+ }
+
+ private EnforceProperty.Builder newProperty() {
+ return EnforceProperty.newBuilder();
+ }
+
+ private void initProperties() {
+ if (properties == null) {
+ properties = TUtil.newHashMap();
+ for (EnforceProperty property : proto.getPropertiesList()) {
+ TUtil.putToNestedList(properties, property.getType(), property);
+ }
+ }
+ }
+
+ public boolean hasEnforceProperty(EnforceType type) {
+ initProperties();
+ return properties.containsKey(type);
+ }
+
+ public List<EnforceProperty> getEnforceProperties(EnforceType type) {
+ initProperties();
+ return properties.get(type);
+ }
+
+ public void addSortedInput(String tableName, SortSpec[] sortSpecs) {
+ EnforceProperty.Builder builder = newProperty();
+ SortedInputEnforce.Builder enforce = SortedInputEnforce.newBuilder();
+ enforce.setTableName(tableName);
+ for (SortSpec sortSpec : sortSpecs) {
+ enforce.addSortSpecs(sortSpec.getProto());
+ }
+
+ builder.setType(EnforceType.SORTED_INPUT);
+ builder.setSortedInput(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void addOutputDistinct() {
+ EnforceProperty.Builder builder = newProperty();
+ OutputDistinctEnforce.Builder enforce = OutputDistinctEnforce.newBuilder();
+
+ builder.setType(EnforceType.OUTPUT_DISTINCT);
+ builder.setOutputDistinct(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void addJoin(int pid, JoinEnforce.JoinAlgorithm algorithm) {
+ EnforceProperty.Builder builder = newProperty();
+ JoinEnforce.Builder enforce = JoinEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(algorithm);
+
+ builder.setType(EnforceType.JOIN);
+ builder.setJoin(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void addGroupby(int pid, GroupbyEnforce.GroupbyAlgorithm algorithm) {
+ EnforceProperty.Builder builder = newProperty();
+ GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(algorithm);
+
+ builder.setType(EnforceType.GROUP_BY);
+ builder.setGroupby(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void addSort(int pid, SortEnforce.SortAlgorithm algorithm) {
+ EnforceProperty.Builder builder = newProperty();
+ SortEnforce.Builder enforce = SortEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(algorithm);
+
+ builder.setType(EnforceType.SORT);
+ builder.setSort(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void addBroadcast(String tableName) {
+ EnforceProperty.Builder builder = newProperty();
+ BroadcastEnforce.Builder enforce = BroadcastEnforce.newBuilder();
+ enforce.setTableName(tableName);
+
+ builder.setType(EnforceType.BROADCAST);
+ builder.setBroadcast(enforce);
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public Collection<EnforceProperty> getProperties() {
+ if (proto != null) {
+ return proto.getPropertiesList();
+ } else {
+ List<EnforceProperty> list = TUtil.newList();
+ for (List<EnforceProperty> propertyList : properties.values()) {
+ list.addAll(propertyList);
+ }
+ return list;
+ }
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Enforce ").append(properties.size()).append(" properties: ");
+ boolean first = true;
+ for (EnforceType enforceType : properties.keySet()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(enforceType);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public EnforcerProto getProto() {
+ EnforcerProto.Builder builder = EnforcerProto.newBuilder();
+ builder.addAllProperties(getProperties());
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 694602b..ba01b52 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -35,6 +35,7 @@ import java.util.List;
public class BNLJoinExec extends BinaryPhysicalExec {
// from logical plan
+ private JoinNode plan;
private EvalNode joinQual;
private EvalContext qualCtx;
@@ -57,11 +58,12 @@ public class BNLJoinExec extends BinaryPhysicalExec {
// projection
private final int[] targetIds;
- public BNLJoinExec(final TaskAttemptContext context, final JoinNode join,
+ public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
final PhysicalExec outer, PhysicalExec inner) {
super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
SchemaUtil.merge(outer.getSchema(), inner.getSchema()), outer, inner);
- this.joinQual = join.getJoinQual();
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
if (joinQual != null) { // if join type is not 'cross join'
this.qualCtx = this.joinQual.newContext();
}
@@ -80,6 +82,10 @@ public class BNLJoinExec extends BinaryPhysicalExec {
outputTuple = new VTuple(outSchema.getColumnNum());
}
+ public JoinNode getPlan() {
+ return plan;
+ }
+
public Tuple next() throws IOException {
if (outerTupleSlots.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 6d49880..8fcd527 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -47,8 +47,7 @@ public class ExternalSortExec extends SortExec {
public ExternalSortExec(final TaskAttemptContext context,
final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
throws IOException {
- super(context, plan.getInSchema(), plan.getOutSchema(), child,
- plan.getSortKeys());
+ super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
this.plan = plan;
this.SORT_BUFFER_SIZE = context.getConf().getIntVar(ConfVars.EXT_SORT_BUFFER);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index 1532307..a95e70e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -37,8 +37,7 @@ public class MemSortExec extends SortExec {
public MemSortExec(final TaskAttemptContext context,
SortNode plan, PhysicalExec child) {
- super(context, plan.getInSchema(), plan.getOutSchema(), child,
- plan.getSortKeys());
+ super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
this.plan = plan;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index 4f61a50..9d54cca 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -34,8 +34,8 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
this.child = child;
}
- public PhysicalExec getChild() {
- return this.child;
+ public <T extends PhysicalExec> T getChild() {
+ return (T) this.child;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 0676277..0b08ccf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.query;
import org.apache.tajo.DataChannel;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
@@ -44,6 +45,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
private Boolean shouldDie;
private QueryContext queryContext;
private DataChannel dataChannel;
+ private Enforcer enforcer;
private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
private QueryUnitRequestProto.Builder builder = null;
@@ -57,9 +59,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
public QueryUnitRequestImpl(QueryUnitAttemptId id, List<Fragment> fragments,
String outputTable, boolean clusteredOutput,
- String serializedData, QueryContext queryContext, DataChannel channel) {
+ String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
this();
- this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel);
+ this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer);
}
public QueryUnitRequestImpl(QueryUnitRequestProto proto) {
@@ -71,7 +73,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
public void set(QueryUnitAttemptId id, List<Fragment> fragments,
String outputTable, boolean clusteredOutput,
- String serializedData, QueryContext queryContext, DataChannel dataChannel) {
+ String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
this.id = id;
this.fragments = fragments;
this.outputTable = outputTable;
@@ -81,6 +83,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
this.queryContext = queryContext;
this.queryContext = queryContext;
this.dataChannel = dataChannel;
+ this.enforcer = enforcer;
}
@Override
@@ -214,14 +217,27 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
if (dataChannel != null) {
return dataChannel;
}
- if (!p.hasQueryContext()) {
+ if (!p.hasDataChannel()) {
return null;
}
this.dataChannel = new DataChannel(p.getDataChannel());
return this.dataChannel;
}
-
- public List<Fetch> getFetches() {
+
+ @Override
+ public Enforcer getEnforcer() {
+ QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (enforcer != null) {
+ return enforcer;
+ }
+ if (!p.hasEnforcer()) {
+ return null;
+ }
+ this.enforcer = new Enforcer(p.getEnforcer());
+ return this.enforcer;
+ }
+
+ public List<Fetch> getFetches() {
initFetches();
return this.fetches;
@@ -297,6 +313,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
if (this.dataChannel != null) {
builder.setDataChannel(dataChannel.getProto());
}
+ if (this.enforcer != null) {
+ builder.setEnforcer(enforcer.getProto());
+ }
}
private void mergeLocalToProto() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
index a9f3706..971f13a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
@@ -24,6 +24,7 @@ package org.apache.tajo.ipc.protocolrecords;
import org.apache.tajo.DataChannel;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.QueryContext;
import org.apache.tajo.storage.Fragment;
@@ -46,4 +47,5 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn
public void setShouldDie();
public QueryContext getQueryContext();
public DataChannel getDataChannel();
+ public Enforcer getEnforcer();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
index 5637d58..3c03e50 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
@@ -16,6 +16,7 @@ package org.apache.tajo.master;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.*;
import java.util.*;
@@ -37,6 +38,7 @@ public class ExecutionBlock {
private ExecutionBlock parent;
private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
private PartitionType outputType;
+ private Enforcer enforcer = new Enforcer();
private boolean hasJoinPlan;
private boolean hasUnionPlan;
@@ -51,10 +53,6 @@ public class ExecutionBlock {
return executionBlockId;
}
- public void setPartitionType(PartitionType partitionType) {
- this.outputType = partitionType;
- }
-
public PartitionType getPartitionType() {
return outputType;
}
@@ -96,6 +94,10 @@ public class ExecutionBlock {
return plan;
}
+ public Enforcer getEnforcer() {
+ return enforcer;
+ }
+
public boolean isRoot() {
return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 24eea42..cdbd803 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -466,7 +466,7 @@ public class TaskSchedulerImpl extends AbstractService
false,
task.getLogicalPlan().toJson(),
context.getQueryContext(),
- subQuery.getDataChannel());
+ subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
if (!subQuery.getBlock().isRoot()) {
taskAssign.setInterQuery();
}
@@ -512,7 +512,8 @@ public class TaskSchedulerImpl extends AbstractService
false,
task.getLogicalPlan().toJson(),
context.getQueryContext(),
- subQuery.getDataChannel());
+ subQuery.getDataChannel(),
+ subQuery.getBlock().getEnforcer());
if (!subQuery.getBlock().isRoot()) {
taskAssign.setInterQuery();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 63d8f04..0a45cfb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -151,6 +151,7 @@ public class Task {
request.getFragments().toArray(new Fragment[request.getFragments().size()]),
taskDir);
this.context.setDataChannel(request.getDataChannel());
+ this.context.setEnforcer(request.getEnforcer());
plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
interQuery = request.getProto().getInterQuery();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 1726c7b..e70f780 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -61,6 +61,7 @@ message QueryUnitRequestProto {
optional bool shouldDie = 8;
optional KeyValueSetProto queryContext = 9;
optional DataChannelProto dataChannel = 10;
+ optional EnforcerProto enforcer = 11;
}
message Fetch {
@@ -139,7 +140,6 @@ message DataChannelProto {
repeated ColumnProto partitionKey = 7;
optional int32 partitionNum = 8 [default = 1];
- repeated SortSpecProto sortSpecs = 9;
optional StoreType storeType = 10 [default = CSV];
}
@@ -166,4 +166,75 @@ service TajoWorkerProtocolService {
//from QueryMaster(Worker)
rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+}
+
+message EnforceProperty {
+ enum EnforceType {
+ SORTED_INPUT = 0;
+ OUTPUT_DISTINCT = 1;
+ GROUP_BY = 2;
+ JOIN = 3;
+ SORT = 4;
+ BROADCAST = 5;
+ }
+
+ // Identifies which field is filled in.
+ required EnforceType type = 1;
+
+ // One of the following will be filled in.
+ optional SortedInputEnforce sortedInput = 2;
+ optional OutputDistinctEnforce outputDistinct = 3;
+ optional GroupbyEnforce groupby = 4;
+ optional JoinEnforce join = 5;
+ optional SortEnforce sort = 6;
+ optional BroadcastEnforce broadcast = 7;
+}
+
+message SortedInputEnforce {
+ required string tableName = 1;
+ repeated SortSpecProto sortSpecs = 2;
+}
+
+message OutputDistinctEnforce {
+}
+
+message JoinEnforce {
+ enum JoinAlgorithm {
+ NESTED_LOOP_JOIN = 0;
+ BLOCK_NESTED_LOOP_JOIN = 1;
+ IN_MEMORY_HASH_JOIN = 2;
+ HYBRID_HASH_JOIN = 3;
+ MERGE_JOIN = 4;
+ }
+
+ required int32 pid = 1;
+ required JoinAlgorithm algorithm = 2;
+}
+
+message GroupbyEnforce {
+ enum GroupbyAlgorithm {
+ HASH_AGGREGATION = 0;
+ SORT_AGGREGATION = 1;
+ }
+
+ required int32 pid = 1;
+ required GroupbyAlgorithm algorithm = 2;
+}
+
+message SortEnforce {
+ enum SortAlgorithm {
+ IN_MEMORY_SORT = 0;
+ MERGE_SORT = 1;
+ }
+
+ required int32 pid = 1;
+ required SortAlgorithm algorithm = 2;
+}
+
+message BroadcastEnforce {
+ required string tableName = 1;
+}
+
+message EnforcerProto {
+ repeated EnforceProperty properties = 1;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 118f352..425418f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -30,12 +30,12 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -45,6 +45,7 @@ import org.junit.Test;
import java.io.IOException;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -134,29 +135,27 @@ public class TestBNLJoinExec {
@Test
public final void testBNLCrossJoin() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
-
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
-
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
- Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ ctx.setEnforcer(enforcer);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
- NLJoinExec nlJoin = (NLJoinExec) proj.getChild();
- SeqScanExec scanOuter = (SeqScanExec) nlJoin.getLeftChild();
- SeqScanExec scanInner = (SeqScanExec) nlJoin.getRightChild();
-
- BNLJoinExec bnl = new BNLJoinExec(ctx, nlJoin.getPlan(), scanOuter, scanInner);
- proj.setChild(bnl);
+ assertTrue(proj.getChild() instanceof BNLJoinExec);
int i = 0;
exec.init();
@@ -169,45 +168,31 @@ public class TestBNLJoinExec {
@Test
public final void testBNLInnerJoin() throws IOException, PlanningException {
+ Expr context = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
-
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
- TaskAttemptContext ctx =
- new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
- merged, workDir);
- Expr context = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
+ merged, workDir);
+ ctx.setEnforcer(enforcer);
+
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
- SeqScanExec scanOuter = null;
- SeqScanExec scanInner = null;
-
ProjectionExec proj = (ProjectionExec) exec;
- JoinNode joinNode = null;
- if (proj.getChild() instanceof MergeJoinExec) {
- MergeJoinExec join = (MergeJoinExec) proj.getChild();
- ExternalSortExec sortOut = (ExternalSortExec) join.getLeftChild();
- ExternalSortExec sortIn = (ExternalSortExec) join.getRightChild();
- scanOuter = (SeqScanExec) sortOut.getChild();
- scanInner = (SeqScanExec) sortIn.getChild();
- joinNode = join.getPlan();
- } else if (proj.getChild() instanceof HashJoinExec) {
- HashJoinExec join = (HashJoinExec) proj.getChild();
- scanOuter = (SeqScanExec) join.getLeftChild();
- scanInner = (SeqScanExec) join.getRightChild();
- joinNode = join.getPlan();
- }
-
- BNLJoinExec bnl = new BNLJoinExec(ctx, joinNode, scanOuter,
- scanInner);
- proj.setChild(bnl);
+ assertTrue(proj.getChild() instanceof BNLJoinExec);
Tuple tuple;
int i = 1;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 864c776..3ba7fec 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
@@ -113,6 +114,7 @@ public class TestExternalSortExec {
Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = plan.getRootBlock().getRoot();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 4b5a422..47d6a94 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
@@ -149,6 +150,7 @@ public class TestHashAntiJoinExec {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(expr);
optimizer.optimize(plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 913c60e..3466d17 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -30,11 +30,12 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -44,6 +45,7 @@ import org.junit.Test;
import java.io.IOException;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -132,35 +134,27 @@ public class TestHashJoinExec {
@Test
public final void testHashInnerJoin() throws IOException, PlanningException {
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
- Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
- Integer.MAX_VALUE);
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+ Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
- Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ ctx.setEnforcer(enforcer);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
- if (proj.getChild() instanceof MergeJoinExec) {
- MergeJoinExec join = (MergeJoinExec) proj.getChild();
- ExternalSortExec sortout = (ExternalSortExec) join.getLeftChild();
- ExternalSortExec sortin = (ExternalSortExec) join.getRightChild();
- SeqScanExec scanout = (SeqScanExec) sortout.getChild();
- SeqScanExec scanin = (SeqScanExec) sortin.getChild();
-
- HashJoinExec hashjoin = new HashJoinExec(ctx, join.getPlan(), scanout, scanin);
- proj.setChild(hashjoin);
-
- exec = proj;
- }
+ assertTrue(proj.getChild() instanceof HashJoinExec);
Tuple tuple;
int count = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 377be20..1db8300 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
@@ -153,6 +154,7 @@ public class TestHashSemiJoinExec {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(expr);
optimizer.optimize(plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 141fbb7..cae4853 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -32,8 +32,12 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -43,6 +47,7 @@ import org.junit.Test;
import java.io.IOException;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -145,71 +150,27 @@ public class TestMergeJoinExec {
@Test
public final void testMergeInnerJoin() throws IOException, PlanningException {
- Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
- Integer.MAX_VALUE);
- Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(),
- Integer.MAX_VALUE);
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(expr);
+ LogicalNode root = plan.getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(root, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);;
+
+ Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+ Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
- Expr expr = analyzer.parse(QUERIES[0]);
- LogicalPlan plan = planner.createPlan(expr);
- LogicalNode root = plan.getRootBlock().getRoot();
+ ctx.setEnforcer(enforcer);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, root);
-
ProjectionExec proj = (ProjectionExec) exec;
-
- // TODO - should be planed with user's optimization hint
- if (!(proj.getChild() instanceof MergeJoinExec)) {
- BinaryPhysicalExec nestedLoopJoin = (BinaryPhysicalExec) proj.getChild();
- SeqScanExec outerScan = (SeqScanExec) nestedLoopJoin.getLeftChild();
- SeqScanExec innerScan = (SeqScanExec) nestedLoopJoin.getRightChild();
-
- SeqScanExec tmp;
- if (!outerScan.getTableName().equals("employee")) {
- tmp = outerScan;
- outerScan = innerScan;
- innerScan = tmp;
- }
-
- SortSpec[] outerSortKeys = new SortSpec[2];
- SortSpec[] innerSortKeys = new SortSpec[2];
-
- Schema employeeSchema = catalog.getTableDesc("employee").getMeta()
- .getSchema();
- employeeSchema.setQualifier("e", true);
- outerSortKeys[0] = new SortSpec(
- employeeSchema.getColumnByName("empId"));
- outerSortKeys[1] = new SortSpec(
- employeeSchema.getColumnByName("memId"));
- SortNode outerSort = new SortNode(plan.newPID(), outerSortKeys);
- outerSort.setInSchema(outerScan.getSchema());
- outerSort.setOutSchema(outerScan.getSchema());
-
- Schema peopleSchema = catalog.getTableDesc("people").getMeta().getSchema();
- peopleSchema.setQualifier("p", true);
- innerSortKeys[0] = new SortSpec(
- peopleSchema.getColumnByName("empId"));
- innerSortKeys[1] = new SortSpec(
- peopleSchema.getColumnByName("fk_memid"));
- SortNode innerSort = new SortNode(plan.newPID(), innerSortKeys);
- innerSort.setInSchema(innerScan.getSchema());
- innerSort.setOutSchema(innerScan.getSchema());
-
- MemSortExec outerSortExec = new MemSortExec(ctx, outerSort, outerScan);
- MemSortExec innerSortExec = new MemSortExec(ctx, innerSort, innerScan);
-
- MergeJoinExec mergeJoin = new MergeJoinExec(ctx,
- ((HashJoinExec)nestedLoopJoin).getPlan(), outerSortExec, innerSortExec,
- outerSortKeys, innerSortKeys);
- proj.setChild(mergeJoin);
- exec = proj;
- }
+ assertTrue(proj.getChild() instanceof MergeJoinExec);
Tuple tuple;
int count = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index e79e2f6..5483235 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -34,6 +34,7 @@ import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
@@ -149,6 +150,7 @@ public class TestNLJoinExec {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[0]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
@@ -176,6 +178,7 @@ public class TestNLJoinExec {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[1]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
//LogicalOptimizer.optimize(ctx, plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index f3aa20a..c5366a1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -39,8 +39,10 @@ import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -58,7 +60,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce;
import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
import static org.junit.Assert.*;
public class TestPhysicalPlanner {
@@ -241,6 +245,7 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[7]);
LogicalPlan plan = planner.createPlan(context);
optimizer.optimize(plan);
@@ -271,6 +276,7 @@ public class TestPhysicalPlanner {
"target/test-data/testHashGroupByPlanWithALLField");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[15]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -299,6 +305,7 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[]{frags[0]}, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[7]);
LogicalPlan plan = planner.createPlan(context);
optimizer.optimize(plan);
@@ -355,8 +362,8 @@ public class TestPhysicalPlanner {
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] },
- workDir);
+ new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
ctx.setOutputPath(new Path(workDir, "grouped1"));
Expr context = analyzer.parse(CreateTableAsStmts[0]);
@@ -396,8 +403,8 @@ public class TestPhysicalPlanner {
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] },
- workDir);
+ new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
ctx.setOutputPath(new Path(workDir, "grouped2"));
Expr context = analyzer.parse(CreateTableAsStmts[1]);
@@ -437,9 +444,9 @@ public class TestPhysicalPlanner {
QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[7]);
LogicalPlan plan = planner.createPlan(context);
- LogicalNode rootNode = plan.getRootBlock().getRoot();
int numPartitions = 3;
Column key1 = new Column("score.deptName", Type.TEXT);
@@ -448,7 +455,7 @@ public class TestPhysicalPlanner {
PartitionType.HASH_PARTITION, numPartitions);
dataChannel.setPartitionKey(new Column[]{key1, key2});
ctx.setDataChannel(dataChannel);
- rootNode = optimizer.optimize(plan);
+ LogicalNode rootNode = optimizer.optimize(plan);
TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV);
@@ -496,8 +503,8 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir(
"target/test-data/testPartitionedStorePlanWithEmptyGroupingSet");
- TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] },
- workDir);
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[14]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = plan.getRootBlock().getRoot();
@@ -552,6 +559,7 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[8]);
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -585,6 +593,7 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[9]);
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -616,6 +625,7 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[11]);
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -726,6 +736,7 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(duplicateElimination[0]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -758,6 +769,7 @@ public class TestPhysicalPlanner {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(SORT_QUERY[0]);
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -771,12 +783,6 @@ public class TestPhysicalPlanner {
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-// ProjectionExec proj = (ProjectionExec) exec;
-// ExternalSortExec sort = (ExternalSortExec) proj.getChild();
-//
-// SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
- //IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort, sort.getSchema(), sort.getSchema(), sortSpecs);
-
Tuple tuple;
exec.init();
exec.next();
@@ -843,4 +849,101 @@ public class TestPhysicalPlanner {
scanner.close();
}
+
+ @Test
+ public final void testSortEnforcer() throws IOException, PlanningException {
+ Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+ employee.getPath(), Integer.MAX_VALUE);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer");
+ Expr context = analyzer.parse(SORT_QUERY[0]);
+ LogicalPlan plan = planner.createPlan(context);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+ Enforcer enforcer = new Enforcer();
+ enforcer.addSort(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+ new Fragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertTrue(((ProjectionExec)exec).getChild() instanceof MemSortExec);
+
+ context = analyzer.parse(SORT_QUERY[0]);
+ plan = planner.createPlan(context);
+ optimizer.optimize(plan);
+ rootNode = plan.getRootBlock().getRoot();
+
+ sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+ enforcer = new Enforcer();
+ enforcer.addSort(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
+ ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+ new Fragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertTrue(((ProjectionExec)exec).getChild() instanceof ExternalSortExec);
+ }
+
+ @Test
+ public final void testGroupByEnforcer() throws IOException, PlanningException {
+ Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer");
+ Expr context = analyzer.parse(QUERIES[7]);
+ LogicalPlan plan = planner.createPlan(context);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ GroupbyNode groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+ Enforcer enforcer = new Enforcer();
+ enforcer.addGroupby(groupByNode.getPID(), GroupbyEnforce.GroupbyAlgorithm.HASH_AGGREGATION);
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+ new Fragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertTrue(exec instanceof HashAggregateExec);
+
+ context = analyzer.parse(QUERIES[7]);
+ plan = planner.createPlan(context);
+ optimizer.optimize(plan);
+ rootNode = plan.getRootBlock().getRoot();
+
+ groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+ enforcer = new Enforcer();
+ enforcer.addGroupby(groupByNode.getPID(), GroupbyEnforce.GroupbyAlgorithm.SORT_AGGREGATION);
+ ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+ new Fragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertTrue(exec instanceof SortAggregateExec);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index c655e05..b75c5e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
@@ -110,8 +111,8 @@ public class TestSortExec {
Fragment [] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility
- .newQueryUnitAttemptId(),
- new Fragment[] { frags[0] }, workDir);
+ .newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);