You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/07 08:32:51 UTC
[2/2] git commit: TAJO-583: Broadcast join does not work on
partitioned tables.
TAJO-583: Broadcast join does not work on partitioned tables.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/4179a7c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/4179a7c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/4179a7c9
Branch: refs/heads/master
Commit: 4179a7c9265123bae0a81cacd5eee53a96f98836
Parents: 009e025
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Feb 7 14:01:45 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Feb 7 14:02:04 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 4 +-
.../main/java/org/apache/tajo/util/TUtil.java | 2 +-
.../engine/planner/PhysicalPlannerImpl.java | 31 +++++-
.../apache/tajo/engine/planner/PlanString.java | 14 +++
.../apache/tajo/engine/planner/PlannerUtil.java | 87 +++++++++++----
.../engine/planner/global/ExecutionBlock.java | 5 +-
.../engine/planner/global/GlobalPlanner.java | 50 +++++++--
.../tajo/engine/planner/global/MasterPlan.java | 9 +-
.../planner/physical/HashLeftOuterJoinExec.java | 2 -
.../physical/PartitionMergeScanExec.java | 107 +++++++++++++++++++
.../planner/rewrite/FilterPushDownRule.java | 20 +++-
.../planner/rewrite/ProjectionPushDownRule.java | 1 +
.../tajo/master/querymaster/Repartitioner.java | 68 +++++++-----
.../tajo/master/querymaster/SubQuery.java | 28 ++---
.../apache/tajo/LocalTajoTestingUtility.java | 16 ++-
.../java/org/apache/tajo/QueryTestCaseBase.java | 28 +++--
.../org/apache/tajo/TajoTestingCluster.java | 4 +-
.../test/java/org/apache/tajo/TpchTestBase.java | 3 +-
.../engine/planner/physical/TestSortExec.java | 19 ++--
.../apache/tajo/engine/query/TestCTASQuery.java | 12 +--
.../query/TestJoinOnPartitionedTables.java | 50 +++++++++
.../tajo/engine/query/TestTablePartitions.java | 52 ++++-----
.../TestGroupByQuery/testGroupByNested1.sql | 9 +-
.../TestGroupByQuery/testGroupByNested2.sql | 11 +-
.../customer_ddl.sql | 9 ++
.../insert_into_customer.sql | 11 ++
.../selfJoinOfPartitionedTable.sql | 9 ++
.../testNoProjectionJoinQual.sql | 1 +
.../testPartialFilterPushDown.sql | 9 ++
.../testPartitionTableJoinSmallTable.sql | 11 ++
.../TestJoinQuery/testFullOuterJoin1.sql | 9 +-
.../TestJoinQuery/testLeftOuterJoin1.sql | 8 +-
.../testLeftOuterJoinWithConstantExpr1.sql | 10 +-
.../testLeftOuterJoinWithConstantExpr2.sql | 10 +-
.../testLeftOuterJoinWithConstantExpr3.sql | 4 +-
.../TestJoinQuery/testRightOuterJoin1.sql | 9 +-
.../queries/TestTablePartitions/case3.sql | 8 ++
.../create_partitioned_table_as_select.sql | 19 +++-
.../TestGroupByQuery/testGroupByNested1.result | 4 +-
.../TestGroupByQuery/testGroupByNested2.result | 4 +-
.../selfJoinOfPartitionedTable.result | 7 ++
.../testNoProjectionJoinQual.result | 3 +
.../testPartialFilterPushDown.result | 3 +
.../testPartitionTableJoinSmallTable.result | 7 ++
.../results/TestTablePartitions/case3.result | 5 +
.../java/org/apache/tajo/storage/CSVFile.java | 2 +-
46 files changed, 632 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36c1062..8dc5ee5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -246,7 +246,9 @@ Release 0.8.0 - unreleased
BUG FIXES
- TAJO-588: In some case, leaf task of DefaultTaskScheduler are not
+ TAJO-583: Broadcast join does not work on partitioned tables. (hyunsik)
+
+ TAJO-588: In some case, leaf task of DefaultTaskScheduler are not
distributed execution. (jinho)
TAJO-586: containFunction shouldn't throw NoSuchFunctionException. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index e9d8219..cc694d4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -168,7 +168,7 @@ public class TUtil {
}
public static String collectionToString(Collection objects) {
- boolean first = false;
+ boolean first = true;
StringBuilder sb = new StringBuilder();
for(Object object : objects) {
if (first) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index b8c52c0..dfa2e40 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -139,8 +139,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
ProjectionExec projectionExec = new ProjectionExec(ctx, subQueryNode, leftExec);
return projectionExec;
-
}
+
case PARTITIONS_SCAN:
case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
@@ -749,6 +749,35 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
"Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
+ Enforcer enforcer = ctx.getEnforcer();
+
+ // check if this table is broadcasted one or not.
+ boolean broadcastFlag = false;
+ if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
+ List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
+ for (EnforceProperty property : properties) {
+ broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+ }
+ }
+
+ if (scanNode instanceof PartitionedTableScanNode
+ && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
+ ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
+
+ if (scanNode instanceof PartitionedTableScanNode) {
+ if (broadcastFlag) {
+ PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
+ List<FileFragment> fileFragments = TUtil.newList();
+ for (Path path : partitionedTableScanNode.getInputPaths()) {
+ fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
+ }
+
+ return new PartitionMergeScanExec(ctx, sm, scanNode,
+ FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+ }
+ }
+ }
+
FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
return new SeqScanExec(ctx, sm, scanNode, fragments);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
index ef8bed0..4ecad60 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
@@ -96,4 +96,18 @@ public class PlanString {
currentDetail = null;
}
}
+
+ public String toString() {
+ StringBuilder output = new StringBuilder();
+ output.append(getTitle()).append("\n");
+
+ for (String str : getExplanations()) {
+ output.append(" => ").append(str).append("\n");
+ }
+
+ for (String str : getDetails()) {
+ output.append(" => ").append(str).append("\n");
+ }
+ return output.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index bd9001c..b59cdda 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -34,6 +34,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.TUtil;
@@ -162,41 +163,81 @@ public class PlannerUtil {
this.tobeReplaced = tobeReplaced;
}
+ /**
+ * If this node can have child, it returns TRUE. Otherwise, it returns FALSE.
+ */
+ private static boolean checkIfVisitable(LogicalNode node) {
+ return node instanceof UnaryNode || node instanceof BinaryNode;
+ }
+
@Override
public LogicalNode visit(ReplacerContext context, LogicalPlan plan, @Nullable LogicalPlan.QueryBlock block,
LogicalNode node, Stack<LogicalNode> stack) throws PlanningException {
- LogicalNode child = super.visit(context, plan, null, node, stack);
-
- if (node.deepEquals(target)) {
- LogicalNode parent = stack.peek();
+ LogicalNode left = null;
+ LogicalNode right = null;
- if (parent instanceof BinaryNode) {
- BinaryNode binaryParent = (BinaryNode) parent;
- if (binaryParent.getLeftChild().deepEquals(target)) {
- binaryParent.setLeftChild(tobeReplaced);
- }
- if (binaryParent.getRightChild().deepEquals(target)) {
- binaryParent.setRightChild(tobeReplaced);
- }
- } else if (parent instanceof UnaryNode) {
- UnaryNode unaryParent = (UnaryNode) parent;
- unaryParent.setChild(tobeReplaced);
+ if (node instanceof UnaryNode) {
+ UnaryNode unaryNode = (UnaryNode) node;
+ if (unaryNode.getChild().deepEquals(target)) {
+ unaryNode.setChild(tobeReplaced);
+ left = tobeReplaced;
+ context.updateSchemaFlag = true;
+ } else if (checkIfVisitable(unaryNode.getChild())) {
+ left = visit(context, plan, null, unaryNode.getChild(), stack);
+ }
+ } else if (node instanceof BinaryNode) {
+ BinaryNode binaryNode = (BinaryNode) node;
+ if (binaryNode.getLeftChild().deepEquals(target)) {
+ binaryNode.setLeftChild(tobeReplaced);
+ left = tobeReplaced;
+ context.updateSchemaFlag = true;
+ } else if (checkIfVisitable(binaryNode.getLeftChild())) {
+ left = visit(context, plan, null, binaryNode.getLeftChild(), stack);
+ } else {
+ left = binaryNode.getLeftChild();
}
- context.updateSchemaFlag = true;
+ if (binaryNode.getRightChild().deepEquals(target)) {
+ binaryNode.setRightChild(tobeReplaced);
+ right = tobeReplaced;
+ context.updateSchemaFlag = true;
+ } else if (checkIfVisitable(binaryNode.getRightChild())) {
+ right = visit(context, plan, null, binaryNode.getRightChild(), stack);
+ } else {
+ right = binaryNode.getRightChild();
+ }
}
- if (context.updateSchemaFlag && !node.deepEquals(target)) {
+ // update schemas of nodes except for leaf node (i.e., RelationNode)
+ if (context.updateSchemaFlag) {
if (node instanceof Projectable) {
- node.setInSchema(child.getOutSchema());
+ if (node instanceof BinaryNode) {
+ node.setInSchema(SchemaUtil.merge(left.getOutSchema(), right.getOutSchema()));
+ } else {
+ node.setInSchema(left.getOutSchema());
+ }
context.updateSchemaFlag = false;
} else {
- node.setInSchema(child.getOutSchema());
- node.setOutSchema(child.getOutSchema());
+ node.setInSchema(left.getOutSchema());
+ node.setOutSchema(left.getOutSchema());
}
}
return node;
}
+
+ @Override
+ public LogicalNode visitScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+ Stack<LogicalNode> stack) throws PlanningException {
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitPartitionedTableScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.
+ QueryBlock block,PartitionedTableScanNode node, Stack<LogicalNode> stack)
+
+ throws PlanningException {
+ return node;
+ }
}
public static void replaceNode(LogicalNode plan, LogicalNode newNode, NodeType type) {
@@ -632,7 +673,11 @@ public class PlannerUtil {
public static <T extends LogicalNode> T clone(LogicalPlan plan, LogicalNode node) {
try {
T copy = (T) node.clone();
- copy.setPID(plan.newPID());
+ if (plan == null) {
+ copy.setPID(-1);
+ } else {
+ copy.setPID(plan.newPID());
+ }
return copy;
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 4f3976e..7df6b43 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -108,12 +108,9 @@ public class ExecutionBlock {
return hasUnionPlan;
}
- public void addBroadcastTables(Collection<String> tableNames) {
- broadcasted.addAll(tableNames);
- }
-
public void addBroadcastTable(String tableName) {
broadcasted.add(tableName);
+ enforcer.addBroadcast(tableName);
}
public boolean isBroadcastTable(String tableName) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index cb2ac15..2aa93d7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -30,10 +30,7 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
@@ -42,6 +39,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
import java.io.IOException;
import java.util.*;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.*;
/**
@@ -55,7 +53,7 @@ public class GlobalPlanner {
public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm) throws IOException {
this.conf = conf;
- this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
+ this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
Preconditions.checkArgument(storeType != null);
}
@@ -137,6 +135,42 @@ public class GlobalPlanner {
return channel;
}
+ /**
+ * It calculates the total volume of all descendent relation nodes.
+ */
+ public static long computeDescendentVolume(LogicalNode node) throws PlanningException {
+
+ if (node instanceof RelationNode) {
+ switch (node.getType()) {
+ case SCAN:
+ case PARTITIONS_SCAN:
+ ScanNode scanNode = (ScanNode) node;
+ if (scanNode.getTableDesc().getStats() == null) {
+ // TODO - this case means that data is not located in HDFS. So, we need additional
+ // broadcast method.
+ return Long.MAX_VALUE;
+ } else {
+ return scanNode.getTableDesc().getStats().getNumBytes();
+ }
+ case TABLE_SUBQUERY:
+ return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery());
+ default:
+ throw new IllegalArgumentException("Not RelationNode");
+ }
+ } else if (node instanceof UnaryNode) {
+ return computeDescendentVolume(((UnaryNode) node).getChild());
+ } else if (node instanceof BinaryNode) {
+ BinaryNode binaryNode = (BinaryNode) node;
+ return computeDescendentVolume(binaryNode.getLeftChild()) + computeDescendentVolume(binaryNode.getRightChild());
+ }
+
+ throw new PlanningException("Invalid State");
+ }
+
+ private static boolean checkIfCanBeOneOfBroadcastJoin(LogicalNode node) {
+ return node.getType() == NodeType.SCAN || node.getType() == NodeType.PARTITIONS_SCAN;
+ }
+
private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
ExecutionBlock leftBlock, ExecutionBlock rightBlock)
throws PlanningException {
@@ -149,7 +183,7 @@ public class GlobalPlanner {
boolean leftBroadcasted = false;
boolean rightBroadcasted = false;
- if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
+ if (checkIfCanBeOneOfBroadcastJoin(leftNode) && checkIfCanBeOneOfBroadcastJoin(rightNode)) {
ScanNode leftScan = (ScanNode) leftNode;
ScanNode rightScan = (ScanNode) rightNode;
@@ -169,9 +203,13 @@ public class GlobalPlanner {
currentBlock.setPlan(joinNode);
if (leftBroadcasted) {
currentBlock.addBroadcastTable(leftScan.getCanonicalName());
+ LOG.info("The left table " + rightScan.getCanonicalName() + " ("
+ + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
}
if (rightBroadcasted) {
currentBlock.addBroadcastTable(rightScan.getCanonicalName());
+ LOG.info("The right table " + rightScan.getCanonicalName() + " ("
+ + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
}
context.execBlockMap.remove(leftScan.getPID());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index f6ac2be..91f658d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -27,6 +27,7 @@ import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.util.TUtil;
import java.util.ArrayList;
import java.util.HashMap;
@@ -233,14 +234,20 @@ public class MasterPlan {
continue;
}
+ if (block.getBroadcastTables().size() > 0) {
+ sb.append("\nBroadcasted Tables: ").append(TUtil.collectionToString(block.getBroadcastTables()));
+ sb.append("\n");
+ }
+
if (!isLeaf(block)) {
sb.append("\n[Incoming]\n");
for (DataChannel channel : getIncomingChannels(block.getId())) {
sb.append(channel).append("\n");
}
}
- sb.append("\n[Outgoing]\n");
+
if (!isRoot(block)) {
+ sb.append("\n[Outgoing]\n");
for (DataChannel channel : getOutgoingChannels(block.getId())) {
sb.append(channel);
sb.append("\n");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index d0c9897..314b3d9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -62,7 +62,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
protected final Projector projector;
private int rightNumCols;
- private int leftNumCols;
private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
@@ -94,7 +93,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
outTuple = new VTuple(outSchema.getColumnNum());
leftKeyTuple = new VTuple(leftKeyList.length);
- leftNumCols = leftChild.getSchema().getColumnNum();
rightNumCols = rightChild.getSchema().getColumnNum();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
new file mode 100644
index 0000000..a39f4be
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * A Scanner that reads multiple partitions
+ */
+public class PartitionMergeScanExec extends PhysicalExec {
+ private final ScanNode plan;
+ private SeqScanExec currentScanner = null;
+
+ private CatalogProtos.FragmentProto [] fragments;
+
+ private List<SeqScanExec> scanners = Lists.newArrayList();
+ private Iterator<SeqScanExec> iterator;
+
+ private AbstractStorageManager sm;
+
+ public PartitionMergeScanExec(TaskAttemptContext context, AbstractStorageManager sm,
+ ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+
+ this.plan = plan;
+ this.fragments = fragments;
+ this.sm = sm;
+ }
+
+ public void init() throws IOException {
+ for (CatalogProtos.FragmentProto fragment : fragments) {
+ scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
+ new CatalogProtos.FragmentProto[] {fragment}));
+ }
+ rescan();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ while (currentScanner != null) {
+ tuple = currentScanner.next();
+
+ if (tuple != null) {
+ return tuple;
+ }
+
+ if (iterator.hasNext()) {
+ if (currentScanner != null) {
+ currentScanner.close();
+ }
+ currentScanner = iterator.next();
+ currentScanner.init();
+ } else {
+ break;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ if (scanners.size() > 0) {
+ iterator = scanners.iterator();
+ currentScanner = iterator.next();
+ currentScanner.init();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (SeqScanExec scanner : scanners) {
+ scanner.close();
+ }
+ }
+
+ public String getTableName() {
+ return plan.getTableName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 68a0e30..399903c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -26,6 +26,7 @@ import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.util.TUtil;
import java.util.*;
@@ -65,9 +66,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
visit(cnf, plan, block, selNode.getChild(), stack);
stack.pop();
- // remove the selection operator if there is no search condition
- // after selection push.
- if(cnf.size() == 0) {
+ if(cnf.size() == 0) { // remove the selection operator if there is no search condition after selection push.
LogicalNode node = stack.peek();
if (node instanceof UnaryNode) {
UnaryNode unary = (UnaryNode) node;
@@ -75,6 +74,21 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
} else {
throw new InvalidQueryException("Unexpected Logical Query Plan");
}
+ } else { // if there remain search conditions
+
+ // check if it can be evaluated here
+ Set<EvalNode> matched = TUtil.newHashSet();
+ for (EvalNode eachEval : cnf) {
+ if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
+ matched.add(eachEval);
+ }
+ }
+
+ // if there are search conditions which can be evaluated here, push down them and remove them from cnf.
+ if (matched.size() > 0) {
+ selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
+ cnf.removeAll(matched);
+ }
}
return selNode;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 7512f23..7aa37bd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -529,6 +529,7 @@ public class ProjectionPushDownRule extends
String joinQualReference = null;
if (node.hasJoinQual()) {
joinQualReference = newContext.addExpr(node.getJoinQual());
+ newContext.addNecessaryReferences(node.getJoinQual());
}
String [] referenceNames = null;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 0f4a62b..71ab8f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -29,15 +29,14 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
import org.apache.tajo.engine.planner.UniformRangePartition;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.master.TaskSchedulerContext;
@@ -80,10 +79,10 @@ public class Repartitioner {
Path tablePath;
FileFragment[] fragments = new FileFragment[2];
- TableStats[] stats = new TableStats[2];
+ long[] stats = new long[2];
// initialize variables from the child operators
- for (int i =0; i < 2; i++) {
+ for (int i = 0; i < 2; i++) {
TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
if (tableDesc == null) { // if it is a real table stored on storage
// TODO - to be fixed (wrong directory)
@@ -92,16 +91,22 @@ public class Repartitioner {
childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
tablePath = storageManager.getTablePath(scans[i].getTableName());
- stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
+ stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
} else {
tablePath = tableDesc.getPath();
- stats[i] = tableDesc.getStats();
+ try {
+ stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
+ } catch (PlanningException e) {
+ throw new IOException(e);
+ }
fragments[i] = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(),
tablePath).get(0);
}
}
+ LOG.info(String.format("Left Volume: %d, Right Volume: %d", stats[0], stats[1]));
+
// Assigning either fragments or fetch urls to query units
boolean leftSmall = execBlock.isBroadcastTable(scans[0].getCanonicalName());
boolean rightSmall = execBlock.isBroadcastTable(scans[1].getCanonicalName());
@@ -111,16 +116,13 @@ public class Repartitioner {
SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
schedulerContext.setEstimatedTaskNum(1);
} else if (leftSmall ^ rightSmall) {
- LOG.info("[Distributed Join Strategy] : Broadcast Join");
int broadcastIdx = leftSmall ? 0 : 1;
int baseScanIdx = leftSmall ? 1 : 0;
-
- LOG.info("Broadcasting Table Volume: " + stats[broadcastIdx].getNumBytes());
- LOG.info("Base Table Volume: " + stats[baseScanIdx].getNumBytes());
-
+ LOG.info(String.format("[BRDCAST JOIN] base_table=%s, base_volume=%d",
+ scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments[broadcastIdx]);
} else {
- LOG.info("[Distributed Join Strategy] : Repartition Join");
+ LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
// The hash map is modeling as follows:
// <Part Id, <Table Name, Intermediate Data>>
Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
@@ -151,22 +153,19 @@ public class Repartitioner {
}
}
- LOG.info("Outer Intermediate Volume: " + stats[0].getNumBytes());
- LOG.info("Inner Intermediate Volume: " + stats[1].getNumBytes());
-
int [] avgSize = new int[2];
- avgSize[0] = (int) (stats[0].getNumBytes() / hashEntries.size());
- avgSize[1] = (int) (stats[1].getNumBytes() / hashEntries.size());
+ avgSize[0] = (int) (stats[0] / hashEntries.size());
+ avgSize[1] = (int) (stats[1] / hashEntries.size());
int bothFetchSize = avgSize[0] + avgSize[1];
// Getting the desire number of join tasks according to the volumn
// of a larger table
- int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
+ int largerIdx = stats[0] >= stats[1] ? 0 : 1;
int desireJoinTaskVolumn = subQuery.getContext().getConf().
getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
// calculate the number of tasks according to the data size
- int mb = (int) Math.ceil((double)stats[largerIdx].getNumBytes() / 1048576);
+ int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
LOG.info("Larger intermediate data is approximately " + mb + " MB");
// determine the number of task per 64MB
int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
@@ -190,20 +189,39 @@ public class Repartitioner {
}
}
+ /**
+ * It creates a number of fragments for all partitions.
+ */
+ public static List<FileFragment> getFragmentsFromPartitionedTable(AbstractStorageManager sm,
+ ScanNode scan,
+ TableDesc table) throws IOException {
+ List<FileFragment> fragments = Lists.newArrayList();
+ PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+ for (Path path : partitionsScan.getInputPaths()) {
+ fragments.addAll(sm.getSplits(
+ scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
+ }
+ partitionsScan.setInputPaths(null);
+ return fragments;
+ }
+
private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
int baseScanId, FileFragment broadcasted) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
ScanNode[] scans = execBlock.getScanNodes();
Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
TableMeta meta;
- Path inputPath;
ScanNode scan = scans[baseScanId];
TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
- inputPath = desc.getPath();
meta = desc.getMeta();
- List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
- inputPath);
+ Collection<FileFragment> fragments;
+ if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+ fragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc);
+ } else {
+ fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+ desc.getPath());
+ }
SubQuery.scheduleFragments(subQuery, fragments, broadcasted);
schedulerContext.setEstimatedTaskNum(fragments.size());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 02c2f6a..83a593a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -44,7 +44,10 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -673,9 +676,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
Repartitioner.scheduleFragmentsForJoinQuery(subQuery.schedulerContext, subQuery);
} else { // Case 3: Others (Sort or Aggregation)
int numTasks = getNonLeafTaskNum(subQuery);
-// ExecutionBlockId childId = masterPlan.getChilds(subQuery.getBlock()).get(0).getId();
-// SubQuery child = subQuery.context.getSubQuery(childId);
-// DataChannel channel = masterPlan.getChannel(child.getId(), subQuery.getId());
Repartitioner.scheduleFragmentsForNonLeafTasks(subQuery.schedulerContext, masterPlan, subQuery, numTasks);
}
}
@@ -747,22 +747,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.eventHandler.handle(event);
}
- /**
- * It creates a number of fragments for all partitions.
- */
- private static Collection<FileFragment> getFragmentsFromPartitionedTable(SubQuery subQuery,
- ScanNode scan,
- TableDesc table) throws IOException {
- List<FileFragment> fragments = Lists.newArrayList();
- PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
- for (Path path : partitionsScan.getInputPaths()) {
- fragments.addAll(subQuery.getStorageManager().getSplits(
- scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
- }
- partitionsScan.setInputPaths(null);
- return fragments;
- }
-
private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
ScanNode[] scans = execBlock.getScanNodes();
@@ -778,7 +762,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
// Otherwise, it creates at least one fragments for a table, which may
// span a number of blocks or possibly consists of a number of files.
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- fragments = getFragmentsFromPartitionedTable(subQuery, scan, table);
+ fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table);
} else {
Path inputPath = table.getPath();
fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
@@ -811,7 +795,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.getId(), fragment));
}
- public static void scheduleFragments(SubQuery subQuery, List<FileFragment> leftFragments,
+ public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments,
FileFragment broadcastFragment) {
for (FileFragment eachLeafFragment : leftFragments) {
scheduleFragment(subQuery, eachLeafFragment, broadcastFragment);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index fa67eb6..ae59d11 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,10 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
@@ -53,7 +50,7 @@ public class LocalTajoTestingUtility {
/**
* for test
- * @return
+ * @return The generated QueryId
*/
public synchronized static QueryId newQueryId() {
return QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0));
@@ -82,6 +79,15 @@ public class LocalTajoTestingUtility {
Path dfsPath = new Path(tablePath, localPath.getName());
fs.copyFromLocalFile(localPath, dfsPath);
TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option);
+
+ // Enable this if you want to set pseudo stats. But, it will causes errors in some unit tests.
+ // So, you just use manually it for certain unit tests.
+// TableStats stats = new TableStats();
+// stats.setNumBytes(TPCH.tableVolumes.get(names[i]));
+// TableDesc tableDesc = new TableDesc(names[i], schemas[i], meta, tablePath);
+// tableDesc.setStats(stats);
+// util.getMaster().getCatalog().addTable(tableDesc);
+
client.createExternalTable(names[i], schemas[i], tablePath, meta);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 24a4237..e1a231a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -25,6 +25,7 @@ import org.apache.tajo.algebra.CreateTable;
import org.apache.tajo.algebra.DropTable;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.OpType;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
@@ -92,7 +93,7 @@ import static org.junit.Assert.*;
* <code>QueryTestCaseBase</code> basically provides the following methods:
* <ul>
* <li><code>{@link #executeQuery()}</code> - executes a corresponding query and returns an ResultSet instance</li>
- * <li><code>{@link #executeQuery(String)}</code> - executes a given query file included in the corresponding query
+ * <li><code>{@link #executeFile(String)}</code> - executes a given query file included in the corresponding query
* file in the current class's query directory</li>
* <li><code>assertResultSet()</code> - check if the query result is equivalent to the expected result included
* in the corresponding result file in the current class's result directory.</li>
@@ -178,7 +179,7 @@ public class QueryTestCaseBase {
currentDatasetPath = new Path(datasetBasePath, className);
}
- protected ResultSet execute(String sql) throws Exception {
+ protected ResultSet executeString(String sql) throws Exception {
return testBase.execute(sql);
}
@@ -189,7 +190,7 @@ public class QueryTestCaseBase {
* @return ResultSet of query execution.
*/
public ResultSet executeQuery() throws Exception {
- return executeQuery(name.getMethodName() + ".sql");
+ return executeFile(name.getMethodName() + ".sql");
}
/**
@@ -199,7 +200,7 @@ public class QueryTestCaseBase {
* @param queryFileName The file name to be used to execute a query.
* @return ResultSet of query execution.
*/
- public ResultSet executeQuery(String queryFileName) throws Exception {
+ public ResultSet executeFile(String queryFileName) throws Exception {
Path queryFilePath = getQueryFilePath(queryFileName);
FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
assertTrue(queryFilePath.toString() + " existence check", fs.exists(queryFilePath));
@@ -312,6 +313,10 @@ public class QueryTestCaseBase {
return StorageUtil.concatPath(currentDatasetPath, fileName);
}
+ public String executeDDL(String ddlFileName, @Nullable String [] args) throws Exception {
+ return executeDDL(ddlFileName, null, true, args);
+ }
+
/**
*
* Execute a data definition language (DDL) template. A general SQL DDL statement can be included in this file. But,
@@ -331,11 +336,14 @@ public class QueryTestCaseBase {
* @param args A list of arguments, each of which is used to replace corresponding variable which has a form of ${i}.
* @return The table name created
*/
- public String executeDDL(String ddlFileName, String dataFileName, String ... args) throws Exception {
+ public String executeDDL(String ddlFileName, @Nullable String dataFileName, @Nullable String ... args)
+ throws Exception {
+
return executeDDL(ddlFileName, dataFileName, true, args);
}
- private String executeDDL(String ddlFileName, String dataFileName, boolean isLocalTable, String ... args)
+ private String executeDDL(String ddlFileName, @Nullable String dataFileName, boolean isLocalTable,
+ @Nullable String [] args)
throws Exception {
Path ddlFilePath = new Path(currentQueryPath, ddlFileName);
@@ -386,7 +394,7 @@ public class QueryTestCaseBase {
* @param args The list argument to replace each corresponding format string ${i}. ${i} uses zero-based index.
* @return A string compiled
*/
- private String compileTemplate(String template, String dataFileName, String... args) {
+ private String compileTemplate(String template, @Nullable String dataFileName, @Nullable String ... args) {
String result;
if (dataFileName != null) {
result = template.replace("${table.path}", "\'" + dataFileName + "'");
@@ -394,8 +402,10 @@ public class QueryTestCaseBase {
result = template;
}
- for (int i = 0; i < args.length; i++) {
- result = result.replace("${" + i + "}", args[i]);
+ if (args != null) {
+ for (int i = 0; i < args.length; i++) {
+ result = result.replace("${" + i + "}", args[i]);
+ }
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 6e73df8..9c96e0e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -251,7 +251,9 @@ public class TajoTestingCluster {
}
public void shutdownCatalogCluster() {
- this.catalogServer.shutdown();
+ if (catalogServer != null) {
+ this.catalogServer.shutdown();
+ }
}
public MiniCatalogServer getMiniCatalogCluster() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
index ad3d676..cb1805d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -75,7 +75,8 @@ public class TpchTestBase {
for (int i = 0; i < names.length; i++) {
file = new File("src/test/tpch/" + names[i] + ".tbl");
if(!file.exists()) {
- file = new File(System.getProperty("user.dir") + "/tajo-core/tajo-core-backend/src/test/tpch/" + names[i] + ".tbl");
+ file = new File(System.getProperty("user.dir") + "/tajo-core/tajo-core-backend/src/test/tpch/" + names[i]
+ + ".tbl");
}
tables[i] = FileUtil.readTextFile(file).split("\n");
paths[i] = file.getAbsolutePath();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 07f726d..4aa1ba2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -21,8 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.TpchTestBase;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -35,8 +34,9 @@ import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -48,12 +48,12 @@ import static org.junit.Assert.assertTrue;
public class TestSortExec {
private static TajoConf conf;
private static final String TEST_PATH = "target/test-data/TestPhysicalPlanner";
+ private static TajoTestingCluster util;
private static CatalogService catalog;
private static SQLAnalyzer analyzer;
private static LogicalPlanner planner;
private static LogicalOptimizer optimizer;
private static AbstractStorageManager sm;
- private static TajoTestingCluster util;
private static Path workDir;
private static Path tablePath;
private static TableMeta employeeMeta;
@@ -63,8 +63,8 @@ public class TestSortExec {
@BeforeClass
public static void setUp() throws Exception {
conf = new TajoConf();
- util = new TajoTestingCluster();
- catalog = util.startCatalogCluster().getCatalog();
+ util = TpchTestBase.getInstance().getTestingCluster();
+ catalog = util.getMaster().getCatalog();
workDir = CommonTestingUtil.getTestDir(TEST_PATH);
sm = StorageManagerFactory.getStorageManager(conf, workDir);
@@ -99,17 +99,12 @@ public class TestSortExec {
optimizer = new LogicalOptimizer(conf);
}
- @After
- public void tearDown() throws Exception {
- util.shutdownCatalogCluster();
- }
-
public static String[] QUERIES = {
"select managerId, empId, deptName from employee order by managerId, empId desc" };
@Test
public final void testNext() throws IOException, PlanningException {
- FileFragment[] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility
.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
index ede0d58..9b940da 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -68,7 +68,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
assertEquals(5, desc.getStats().getNumRows().intValue());
- ResultSet res2 = executeQuery("check1.sql");
+ ResultSet res2 = executeFile("check1.sql");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
@@ -107,7 +107,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
assertEquals(5, desc.getStats().getNumRows().intValue());
- ResultSet res2 = executeQuery("check2.sql");
+ ResultSet res2 = executeFile("check2.sql");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
@@ -125,7 +125,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
@Test
public final void testCtasWithGroupby() throws Exception {
- ResultSet res = executeQuery("CtasWithGroupby.sql");
+ ResultSet res = executeFile("CtasWithGroupby.sql");
res.close();
ResultSet res2 = executeQuery();
@@ -135,7 +135,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
@Test
public final void testCtasWithOrderby() throws Exception {
- ResultSet res = executeQuery("CtasWithOrderby.sql");
+ ResultSet res = executeFile("CtasWithOrderby.sql");
res.close();
ResultSet res2 = executeQuery();
@@ -145,7 +145,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
@Test
public final void testCtasWithLimit() throws Exception {
- ResultSet res = executeQuery("CtasWithLimit.sql");
+ ResultSet res = executeFile("CtasWithLimit.sql");
res.close();
ResultSet res2 = executeQuery();
@@ -155,7 +155,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
@Test
public final void testCtasWithUnion() throws Exception {
- ResultSet res = executeQuery("CtasWithUnion.sql");
+ ResultSet res = executeFile("CtasWithUnion.sql");
res.close();
ResultSet res2 = executeQuery();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
new file mode 100644
index 0000000..4bda517
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+
+public class TestJoinOnPartitionedTables extends QueryTestCaseBase {
+
+ @Test
+ public void testPartitionTableJoinSmallTable() throws Exception {
+ executeDDL("customer_ddl.sql", null);
+ ResultSet res = executeFile("insert_into_customer.sql");
+ res.close();
+
+ res = executeQuery();
+ assertResultSet(res);
+ res.close();
+
+ res = executeFile("selfJoinOfPartitionedTable.sql");
+ assertResultSet(res, "selfJoinOfPartitionedTable.result");
+ res.close();
+
+ res = executeFile("testNoProjectionJoinQual.sql");
+ assertResultSet(res, "testNoProjectionJoinQual.result");
+ res.close();
+
+ res = executeFile("testPartialFilterPushDown.sql");
+ assertResultSet(res, "testPartialFilterPushDown.result");
+ res.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 46c1d94..f775acb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -48,7 +48,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testCreateColumnPartitionedTable() throws Exception {
String tableName ="testCreateColumnPartitionedTable";
- ResultSet res = execute(
+ ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
res.close();
@@ -64,7 +64,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testCreateColumnPartitionedTableWithSelectedColumns() throws Exception {
String tableName ="testCreateColumnPartitionedTableWithSelectedColumns";
- ResultSet res = execute(
+ ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
@@ -72,7 +72,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
assertEquals(3, catalog.getTableDesc(tableName).getSchema().getColumnNum());
assertEquals(4, catalog.getTableDesc(tableName).getLogicalSchema().getColumnNum());
- res = execute("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " +
+ res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " +
"l_partkey, l_quantity from lineitem");
res.close();
}
@@ -80,20 +80,20 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testColumnPartitionedTableByOneColumn() throws Exception {
String tableName ="testColumnPartitionedTableByOneColumn";
- ResultSet res = execute(
+ ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(tableName));
- res = execute("insert overwrite into " + tableName
+ res = executeString("insert overwrite into " + tableName
+ " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(tableName);
assertPartitionDirectories(desc);
- res = execute(
+ res = executeString(
"select distinct * from " + tableName + " where (key = 45.0 or key = 38.0) and null_col is null");
Map<Double, int []> resultRows1 = Maps.newHashMap();
@@ -123,13 +123,13 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testQueryCasesOnColumnPartitionedTable() throws Exception {
String tableName ="testQueryCasesOnColumnPartitionedTable";
- ResultSet res = execute(
+ ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(tableName));
- res = execute(
+ res = executeString(
"insert overwrite into " + tableName
+ " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
@@ -137,13 +137,17 @@ public class TestTablePartitions extends QueryTestCaseBase {
TableDesc desc = catalog.getTableDesc(tableName);
assertPartitionDirectories(desc);
- res = executeQuery("case1.sql");
+ res = executeFile("case1.sql");
assertResultSet(res, "case1.result");
res.close();
- res = executeQuery("case2.sql");
+ res = executeFile("case2.sql");
assertResultSet(res, "case2.result");
res.close();
+
+ res = executeFile("case3.sql");
+ assertResultSet(res, "case3.result");
+ res.close();
}
@Test
@@ -156,7 +160,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
CatalogService catalog = cluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(tableName));
- res = execute("insert overwrite into " + tableName
+ res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
@@ -178,7 +182,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
assertEquals(5, desc.getStats().getNumRows().intValue());
- res = execute("select * from " + tableName + " where col2 = 2");
+ res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
@@ -198,7 +202,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
- res = execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+ res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
for (int i = 0; i < 3; i++) {
assertTrue(res.next());
@@ -211,14 +215,14 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
String tableName = "testColumnPartitionedTableByOneColumnsWithCompression";
- ResultSet res = execute(
+ ResultSet res = executeString(
"create table " + tableName + " (col2 int4, col3 float8) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION BY column(col1 int4)");
res.close();
assertTrue(catalog.existsTable(tableName));
- res = execute(
+ res = executeString(
"insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(tableName);
@@ -245,14 +249,14 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception {
String tableName = "testColumnPartitionedTableByTwoColumnsWithCompression";
- ResultSet res = execute("create table " + tableName + " (col3 float8, col4 text) USING csv " +
+ ResultSet res = executeString("create table " + tableName + " (col3 float8, col4 text) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION by column(col1 int4, col2 int4)");
res.close();
assertTrue(catalog.existsTable(tableName));
- res = execute(
+ res = executeString(
"insert overwrite into " + tableName +
" select l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
res.close();
@@ -287,7 +291,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception {
String tableName = "testColumnPartitionedTableByThreeColumnsWithCompression";
- ResultSet res = execute(
+ ResultSet res = executeString(
"create table " + tableName + " (col4 text) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8)");
@@ -295,7 +299,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
assertTrue(catalog.existsTable(tableName));
- res = execute(
+ res = executeString(
"insert overwrite into " + tableName +
" select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
@@ -333,7 +337,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
}
}
- res = execute("select * from " + tableName + " where col2 = 2");
+ res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
@@ -353,7 +357,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
- res = execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+ res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
i = 0;
while(res.next()) {
assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
@@ -368,7 +372,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
@Test
public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
String tableName = "testColumnPartitionedTableNoMatchedPartition";
- ResultSet res = execute(
+ ResultSet res = executeString(
"create table " + tableName + " (col4 text) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8)");
@@ -376,7 +380,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
assertTrue(catalog.existsTable(tableName));
- res = execute(
+ res = executeString(
"insert overwrite into " + tableName +
" select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
@@ -414,7 +418,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
}
}
- res = execute("select * from " + tableName + " where col2 = 9");
+ res = executeString("select * from " + tableName + " where col2 = 9");
assertFalse(res.next());
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
index 10fa423..0ebeaab 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
@@ -1 +1,8 @@
-select l_orderkey + l_partkey as unique_key from lineitem group by l_orderkey + l_partkey;
\ No newline at end of file
+select
+ l_orderkey + l_partkey as unique_key
+from
+ lineitem
+group by
+ l_orderkey + l_partkey
+order by
+ unique_key;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
index e7bbcac..eb86ce0 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
@@ -1 +1,10 @@
-select sum(l_orderkey) + sum(l_partkey) as total from lineitem group by l_orderkey + l_partkey;
\ No newline at end of file
+select * from (
+select
+ sum(l_orderkey) + sum(l_partkey) as total
+from
+ lineitem
+group by
+ l_orderkey + l_partkey
+) t1
+order by
+ total;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql
new file mode 100644
index 0000000..ca43710
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql
@@ -0,0 +1,9 @@
+CREATE TABLE customer_parts (
+ c_custkey INT4,
+ c_name TEXT,
+ c_address TEXT,
+ c_phone TEXT,
+ c_acctbal FLOAT8,
+ c_mktsegment TEXT,
+ c_comment TEXT
+) PARTITION BY COLUMN (c_nationkey INT4);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql
new file mode 100644
index 0000000..29152b6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql
@@ -0,0 +1,11 @@
+INSERT OVERWRITE INTO customer_parts
+ SELECT
+ c_custkey,
+ c_name,
+ c_address,
+ c_phone,
+ c_acctbal,
+ c_mktsegment,
+ c_comment,
+ c_nationkey
+ FROM customer;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql
new file mode 100644
index 0000000..988c0e9
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql
@@ -0,0 +1,9 @@
+select
+ t1.c_nationkey,
+ t2.c_nationkey
+from
+ customer_parts t1, customer_parts t2
+where
+ t1.c_nationkey = t2.c_nationkey
+order by
+ t1.c_nationkey desc;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql
new file mode 100644
index 0000000..1db9c1c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql
@@ -0,0 +1 @@
+select count(*) from customer_parts t1, customer_parts t2 where t1.c_nationkey = t2.c_nationkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql
new file mode 100644
index 0000000..fc9063a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql
@@ -0,0 +1,9 @@
+select
+ upper(c_name) as c_name, count(1)
+from
+ customer_parts
+where
+ c_name is not null and
+ c_nationkey = 1
+group by
+ c_name;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql
new file mode 100644
index 0000000..27e2066
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql
@@ -0,0 +1,11 @@
+select
+ c_custkey,
+ c_name,
+ c_nationkey,
+ n_nationkey
+from
+ customer_parts, nation
+where
+ c_nationkey = n_nationkey
+order by
+ c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
index 221b8a9..ccaa5fb 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
@@ -1 +1,8 @@
-select c_custkey, orders.o_orderkey from orders full outer join customer on c_custkey = o_orderkey;
\ No newline at end of file
+select
+ c_custkey,
+ orders.o_orderkey
+from
+ orders full outer join customer on c_custkey = o_orderkey
+order by
+ c_custkey,
+ orders.o_orderkey;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
index 59a876a..f946e1d 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
@@ -1 +1,7 @@
-select c_custkey, orders.o_orderkey from customer left outer join orders on c_custkey = o_orderkey;
\ No newline at end of file
+select
+ c_custkey,
+ orders.o_orderkey
+from
+ customer left outer join orders on c_custkey = o_orderkey
+order by
+ c_custkey, o_orderkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
index 334c161..f5b0ba7 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
@@ -1 +1,9 @@
-select c_custkey, orders.o_orderkey, 'val' as val from customer left outer join orders on c_custkey = o_orderkey;
\ No newline at end of file
+select
+ c_custkey,
+ orders.o_orderkey,
+ 'val' as val
+from
+ customer left outer join orders on c_custkey = o_orderkey
+order by
+ c_custkey,
+ o_orderkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
index 3256e28..7333d54 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
@@ -1 +1,9 @@
-select c_custkey, o.o_orderkey, 'val' as val from customer left outer join (select * from orders) o on c_custkey = o.o_orderkey
\ No newline at end of file
+select
+ c_custkey,
+ o.o_orderkey,
+ 'val' as val
+from
+ customer left outer join (select * from orders) o on c_custkey = o.o_orderkey
+order by
+ c_custkey,
+ o_orderkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
index 03cdae2..90be13b 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
@@ -12,4 +12,6 @@ left outer join (
group by
c_custkey)
b
-on a.c_custkey = b.c_custkey;
\ No newline at end of file
+on a.c_custkey = b.c_custkey
+order by
+ c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
index cf9896d..ba4c713 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
@@ -1 +1,8 @@
-select c_custkey, orders.o_orderkey from orders right outer join customer on c_custkey = o_orderkey;
\ No newline at end of file
+select
+ c_custkey,
+ orders.o_orderkey
+from
+ orders right outer join customer on c_custkey = o_orderkey
+order by
+ c_custkey,
+ orders.o_orderkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql
new file mode 100644
index 0000000..6cb1ea1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql
@@ -0,0 +1,8 @@
+select
+ l.l_orderkey,
+ p.col1,
+ key
+ from lineitem as l, testQueryCasesOnColumnPartitionedTable as p
+where
+ (key = 45.0 or key = 38.0) and l.l_orderkey = p.col1;
+
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
index 09b14eb..b2d7dce 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
@@ -1,6 +1,17 @@
-CREATE TABLE sales ( col1 int, col2 int)
-PARTITION BY COLUMN (col3 int, col4 float, col5 text)
-AS SELECT col1, col2, col3, col4, col5 FROM sales_src
- WHERE col1 > 16
+CREATE TABLE sales (
+ col1 int,
+ col2 int)
+PARTITION BY COLUMN (col3 int, col4 float, col5 text) AS
+
+SELECT
+ col1,
+ col2,
+ col3,
+ col4,
+ col5
+FROM
+ sales_src
+WHERE
+ col1 > 16
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
index 85c5598..788a084 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
@@ -2,5 +2,5 @@ unique_key
-------------------------------
2
4
-6
-5
\ No newline at end of file
+5
+6
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
index edae882..8656add 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
@@ -2,5 +2,5 @@ total
-------------------------------
4
4
-6
-5
\ No newline at end of file
+5
+6
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result
new file mode 100644
index 0000000..ea4a06b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result
@@ -0,0 +1,7 @@
+c_nationkey,c_nationkey
+-------------------------------
+15,15
+13,13
+4,4
+3,3
+1,1
\ No newline at end of file