You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/07 08:32:51 UTC

[2/2] git commit: TAJO-583: Broadcast join does not work on partitioned tables.

TAJO-583: Broadcast join does not work on partitioned tables.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/4179a7c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/4179a7c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/4179a7c9

Branch: refs/heads/master
Commit: 4179a7c9265123bae0a81cacd5eee53a96f98836
Parents: 009e025
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Feb 7 14:01:45 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Feb 7 14:02:04 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +-
 .../main/java/org/apache/tajo/util/TUtil.java   |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  31 +++++-
 .../apache/tajo/engine/planner/PlanString.java  |  14 +++
 .../apache/tajo/engine/planner/PlannerUtil.java |  87 +++++++++++----
 .../engine/planner/global/ExecutionBlock.java   |   5 +-
 .../engine/planner/global/GlobalPlanner.java    |  50 +++++++--
 .../tajo/engine/planner/global/MasterPlan.java  |   9 +-
 .../planner/physical/HashLeftOuterJoinExec.java |   2 -
 .../physical/PartitionMergeScanExec.java        | 107 +++++++++++++++++++
 .../planner/rewrite/FilterPushDownRule.java     |  20 +++-
 .../planner/rewrite/ProjectionPushDownRule.java |   1 +
 .../tajo/master/querymaster/Repartitioner.java  |  68 +++++++-----
 .../tajo/master/querymaster/SubQuery.java       |  28 ++---
 .../apache/tajo/LocalTajoTestingUtility.java    |  16 ++-
 .../java/org/apache/tajo/QueryTestCaseBase.java |  28 +++--
 .../org/apache/tajo/TajoTestingCluster.java     |   4 +-
 .../test/java/org/apache/tajo/TpchTestBase.java |   3 +-
 .../engine/planner/physical/TestSortExec.java   |  19 ++--
 .../apache/tajo/engine/query/TestCTASQuery.java |  12 +--
 .../query/TestJoinOnPartitionedTables.java      |  50 +++++++++
 .../tajo/engine/query/TestTablePartitions.java  |  52 ++++-----
 .../TestGroupByQuery/testGroupByNested1.sql     |   9 +-
 .../TestGroupByQuery/testGroupByNested2.sql     |  11 +-
 .../customer_ddl.sql                            |   9 ++
 .../insert_into_customer.sql                    |  11 ++
 .../selfJoinOfPartitionedTable.sql              |   9 ++
 .../testNoProjectionJoinQual.sql                |   1 +
 .../testPartialFilterPushDown.sql               |   9 ++
 .../testPartitionTableJoinSmallTable.sql        |  11 ++
 .../TestJoinQuery/testFullOuterJoin1.sql        |   9 +-
 .../TestJoinQuery/testLeftOuterJoin1.sql        |   8 +-
 .../testLeftOuterJoinWithConstantExpr1.sql      |  10 +-
 .../testLeftOuterJoinWithConstantExpr2.sql      |  10 +-
 .../testLeftOuterJoinWithConstantExpr3.sql      |   4 +-
 .../TestJoinQuery/testRightOuterJoin1.sql       |   9 +-
 .../queries/TestTablePartitions/case3.sql       |   8 ++
 .../create_partitioned_table_as_select.sql      |  19 +++-
 .../TestGroupByQuery/testGroupByNested1.result  |   4 +-
 .../TestGroupByQuery/testGroupByNested2.result  |   4 +-
 .../selfJoinOfPartitionedTable.result           |   7 ++
 .../testNoProjectionJoinQual.result             |   3 +
 .../testPartialFilterPushDown.result            |   3 +
 .../testPartitionTableJoinSmallTable.result     |   7 ++
 .../results/TestTablePartitions/case3.result    |   5 +
 .../java/org/apache/tajo/storage/CSVFile.java   |   2 +-
 46 files changed, 632 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36c1062..8dc5ee5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -246,7 +246,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
-    TAJO-588: In some case, leaf task of DefaultTaskScheduler are not 
+    TAJO-583: Broadcast join does not work on partitioned tables. (hyunsik)
+
+    TAJO-588: In some case, leaf task of DefaultTaskScheduler are not
     distributed execution. (jinho)
 
     TAJO-586: containFunction shouldn't throw NoSuchFunctionException. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index e9d8219..cc694d4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -168,7 +168,7 @@ public class TUtil {
   }
 
   public static String collectionToString(Collection objects) {
-    boolean first = false;
+    boolean first = true;
     StringBuilder sb = new StringBuilder();
     for(Object object : objects) {
       if (first) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index b8c52c0..dfa2e40 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -139,8 +139,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
         ProjectionExec projectionExec = new ProjectionExec(ctx, subQueryNode, leftExec);
         return projectionExec;
-
       }
+
       case PARTITIONS_SCAN:
       case SCAN:
         leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
@@ -749,6 +749,35 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
         "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
 
+    Enforcer enforcer = ctx.getEnforcer();
+
+    // check if this table is broadcasted one or not.
+    boolean broadcastFlag = false;
+    if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
+      List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
+      for (EnforceProperty property : properties) {
+        broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+      }
+    }
+
+    if (scanNode instanceof PartitionedTableScanNode
+        && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
+        ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
+
+      if (scanNode instanceof PartitionedTableScanNode) {
+        if (broadcastFlag) {
+          PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
+          List<FileFragment> fileFragments = TUtil.newList();
+          for (Path path : partitionedTableScanNode.getInputPaths()) {
+            fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
+          }
+
+          return new PartitionMergeScanExec(ctx, sm, scanNode,
+              FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+        }
+      }
+    }
+
     FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
     return new SeqScanExec(ctx, sm, scanNode, fragments);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
index ef8bed0..4ecad60 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
@@ -96,4 +96,18 @@ public class PlanString {
       currentDetail = null;
     }
   }
+
+  public String toString() {
+    StringBuilder output = new StringBuilder();
+    output.append(getTitle()).append("\n");
+
+    for (String str : getExplanations()) {
+      output.append("  => ").append(str).append("\n");
+    }
+
+    for (String str : getDetails()) {
+      output.append("  => ").append(str).append("\n");
+    }
+    return output.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index bd9001c..b59cdda 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -34,6 +34,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.util.TUtil;
 
@@ -162,41 +163,81 @@ public class PlannerUtil {
       this.tobeReplaced = tobeReplaced;
     }
 
+    /**
+     * If this node can have child, it returns TRUE. Otherwise, it returns FALSE.
+     */
+    private static boolean checkIfVisitable(LogicalNode node) {
+      return node instanceof UnaryNode || node instanceof BinaryNode;
+    }
+
     @Override
     public LogicalNode visit(ReplacerContext context, LogicalPlan plan, @Nullable LogicalPlan.QueryBlock block,
                              LogicalNode node, Stack<LogicalNode> stack) throws PlanningException {
-      LogicalNode child = super.visit(context, plan, null, node, stack);
-
-      if (node.deepEquals(target)) {
-        LogicalNode parent = stack.peek();
+      LogicalNode left = null;
+      LogicalNode right = null;
 
-        if (parent instanceof BinaryNode) {
-          BinaryNode binaryParent = (BinaryNode) parent;
-          if (binaryParent.getLeftChild().deepEquals(target)) {
-            binaryParent.setLeftChild(tobeReplaced);
-          }
-          if (binaryParent.getRightChild().deepEquals(target)) {
-            binaryParent.setRightChild(tobeReplaced);
-          }
-        } else if (parent instanceof UnaryNode) {
-          UnaryNode unaryParent = (UnaryNode) parent;
-          unaryParent.setChild(tobeReplaced);
+      if (node instanceof UnaryNode) {
+        UnaryNode unaryNode = (UnaryNode) node;
+        if (unaryNode.getChild().deepEquals(target)) {
+          unaryNode.setChild(tobeReplaced);
+          left = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(unaryNode.getChild())) {
+          left = visit(context, plan, null, unaryNode.getChild(), stack);
+        }
+      } else if (node instanceof BinaryNode) {
+        BinaryNode binaryNode = (BinaryNode) node;
+        if (binaryNode.getLeftChild().deepEquals(target)) {
+          binaryNode.setLeftChild(tobeReplaced);
+          left = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(binaryNode.getLeftChild())) {
+          left = visit(context, plan, null, binaryNode.getLeftChild(), stack);
+        } else {
+          left = binaryNode.getLeftChild();
         }
 
-        context.updateSchemaFlag = true;
+        if (binaryNode.getRightChild().deepEquals(target)) {
+          binaryNode.setRightChild(tobeReplaced);
+          right = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(binaryNode.getRightChild())) {
+          right = visit(context, plan, null, binaryNode.getRightChild(), stack);
+        } else {
+          right = binaryNode.getRightChild();
+        }
       }
 
-      if (context.updateSchemaFlag && !node.deepEquals(target)) {
+      // update schemas of nodes except for leaf node (i.e., RelationNode)
+      if (context.updateSchemaFlag) {
         if (node instanceof Projectable) {
-          node.setInSchema(child.getOutSchema());
+          if (node instanceof BinaryNode) {
+            node.setInSchema(SchemaUtil.merge(left.getOutSchema(), right.getOutSchema()));
+          } else {
+            node.setInSchema(left.getOutSchema());
+          }
           context.updateSchemaFlag = false;
         } else {
-          node.setInSchema(child.getOutSchema());
-          node.setOutSchema(child.getOutSchema());
+          node.setInSchema(left.getOutSchema());
+          node.setOutSchema(left.getOutSchema());
         }
       }
       return node;
     }
+
+    @Override
+    public LogicalNode visitScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                                 Stack<LogicalNode> stack) throws PlanningException {
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitPartitionedTableScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.
+        QueryBlock block,PartitionedTableScanNode node, Stack<LogicalNode> stack)
+
+        throws PlanningException {
+      return node;
+    }
   }
   
   public static void replaceNode(LogicalNode plan, LogicalNode newNode, NodeType type) {
@@ -632,7 +673,11 @@ public class PlannerUtil {
   public static <T extends LogicalNode> T clone(LogicalPlan plan, LogicalNode node) {
     try {
       T copy = (T) node.clone();
-      copy.setPID(plan.newPID());
+      if (plan == null) {
+        copy.setPID(-1);
+      } else {
+        copy.setPID(plan.newPID());
+      }
       return copy;
     } catch (CloneNotSupportedException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 4f3976e..7df6b43 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -108,12 +108,9 @@ public class ExecutionBlock {
     return hasUnionPlan;
   }
 
-  public void addBroadcastTables(Collection<String> tableNames) {
-    broadcasted.addAll(tableNames);
-  }
-
   public void addBroadcastTable(String tableName) {
     broadcasted.add(tableName);
+    enforcer.addBroadcast(tableName);
   }
 
   public boolean isBroadcastTable(String tableName) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index cb2ac15..2aa93d7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -30,10 +30,7 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
@@ -42,6 +39,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import java.io.IOException;
 import java.util.*;
 
+import static org.apache.tajo.conf.TajoConf.ConfVars;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.*;
 
 /**
@@ -55,7 +53,7 @@ public class GlobalPlanner {
 
   public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm) throws IOException {
     this.conf = conf;
-    this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
+    this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
     Preconditions.checkArgument(storeType != null);
   }
 
@@ -137,6 +135,42 @@ public class GlobalPlanner {
     return channel;
   }
 
+  /**
+   * It calculates the total volume of all descendent relation nodes.
+   */
+  public static long computeDescendentVolume(LogicalNode node) throws PlanningException {
+
+    if (node instanceof RelationNode) {
+      switch (node.getType()) {
+      case SCAN:
+      case PARTITIONS_SCAN:
+        ScanNode scanNode = (ScanNode) node;
+        if (scanNode.getTableDesc().getStats() == null) {
+          // TODO - this case means that data is not located in HDFS. So, we need additional
+          // broadcast method.
+          return Long.MAX_VALUE;
+        } else {
+          return scanNode.getTableDesc().getStats().getNumBytes();
+        }
+      case TABLE_SUBQUERY:
+        return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery());
+      default:
+        throw new IllegalArgumentException("Not RelationNode");
+      }
+    } else if (node instanceof UnaryNode) {
+      return computeDescendentVolume(((UnaryNode) node).getChild());
+    } else if (node instanceof BinaryNode) {
+      BinaryNode binaryNode = (BinaryNode) node;
+      return computeDescendentVolume(binaryNode.getLeftChild()) + computeDescendentVolume(binaryNode.getRightChild());
+    }
+
+    throw new PlanningException("Invalid State");
+  }
+
+  private static boolean checkIfCanBeOneOfBroadcastJoin(LogicalNode node) {
+    return node.getType() == NodeType.SCAN || node.getType() == NodeType.PARTITIONS_SCAN;
+  }
+
   private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
                                        ExecutionBlock leftBlock, ExecutionBlock rightBlock)
       throws PlanningException {
@@ -149,7 +183,7 @@ public class GlobalPlanner {
     boolean leftBroadcasted = false;
     boolean rightBroadcasted = false;
 
-    if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
+    if (checkIfCanBeOneOfBroadcastJoin(leftNode) && checkIfCanBeOneOfBroadcastJoin(rightNode)) {
       ScanNode leftScan = (ScanNode) leftNode;
       ScanNode rightScan = (ScanNode) rightNode;
 
@@ -169,9 +203,13 @@ public class GlobalPlanner {
         currentBlock.setPlan(joinNode);
         if (leftBroadcasted) {
           currentBlock.addBroadcastTable(leftScan.getCanonicalName());
+          LOG.info("The left table " + rightScan.getCanonicalName() + " ("
+              + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
         }
         if (rightBroadcasted) {
           currentBlock.addBroadcastTable(rightScan.getCanonicalName());
+          LOG.info("The right table " + rightScan.getCanonicalName() + " ("
+              + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
         }
 
         context.execBlockMap.remove(leftScan.getPID());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index f6ac2be..91f658d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -27,6 +27,7 @@ import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.util.TUtil;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -233,14 +234,20 @@ public class MasterPlan {
         continue;
       }
 
+      if (block.getBroadcastTables().size() > 0) {
+        sb.append("\nBroadcasted Tables: ").append(TUtil.collectionToString(block.getBroadcastTables()));
+        sb.append("\n");
+      }
+
       if (!isLeaf(block)) {
         sb.append("\n[Incoming]\n");
         for (DataChannel channel : getIncomingChannels(block.getId())) {
           sb.append(channel).append("\n");
         }
       }
-      sb.append("\n[Outgoing]\n");
+
       if (!isRoot(block)) {
+        sb.append("\n[Outgoing]\n");
         for (DataChannel channel : getOutgoingChannels(block.getId())) {
           sb.append(channel);
           sb.append("\n");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index d0c9897..314b3d9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -62,7 +62,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   protected final Projector projector;
 
   private int rightNumCols;
-  private int leftNumCols;
   private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
 
   public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
@@ -94,7 +93,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
     outTuple = new VTuple(outSchema.getColumnNum());
     leftKeyTuple = new VTuple(leftKeyList.length);
 
-    leftNumCols = leftChild.getSchema().getColumnNum();
     rightNumCols = rightChild.getSchema().getColumnNum();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
new file mode 100644
index 0000000..a39f4be
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * A Scanner that reads multiple partitions
+ */
+public class PartitionMergeScanExec extends PhysicalExec {
+  private final ScanNode plan;
+  private SeqScanExec currentScanner = null;
+
+  private CatalogProtos.FragmentProto [] fragments;
+
+  private List<SeqScanExec> scanners = Lists.newArrayList();
+  private Iterator<SeqScanExec> iterator;
+
+  private AbstractStorageManager sm;
+
+  public PartitionMergeScanExec(TaskAttemptContext context, AbstractStorageManager sm,
+                                ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+
+    this.plan = plan;
+    this.fragments = fragments;
+    this.sm = sm;
+  }
+
+  public void init() throws IOException {
+    for (CatalogProtos.FragmentProto fragment : fragments) {
+      scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
+          new CatalogProtos.FragmentProto[] {fragment}));
+    }
+    rescan();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    while (currentScanner != null) {
+      tuple = currentScanner.next();
+
+      if (tuple != null) {
+        return tuple;
+      }
+
+      if (iterator.hasNext()) {
+        if (currentScanner != null) {
+          currentScanner.close();
+        }
+        currentScanner = iterator.next();
+        currentScanner.init();
+      } else {
+        break;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    if (scanners.size() > 0) {
+      iterator = scanners.iterator();
+      currentScanner = iterator.next();
+      currentScanner.init();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (SeqScanExec scanner : scanners) {
+      scanner.close();
+    }
+  }
+
+  public String getTableName() {
+    return plan.getTableName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 68a0e30..399903c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -26,6 +26,7 @@ import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.util.TUtil;
 
 import java.util.*;
 
@@ -65,9 +66,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
     visit(cnf, plan, block, selNode.getChild(), stack);
     stack.pop();
 
-    // remove the selection operator if there is no search condition
-    // after selection push.
-    if(cnf.size() == 0) {
+    if(cnf.size() == 0) { // remove the selection operator if there is no search condition after selection push.
       LogicalNode node = stack.peek();
       if (node instanceof UnaryNode) {
         UnaryNode unary = (UnaryNode) node;
@@ -75,6 +74,21 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
       } else {
         throw new InvalidQueryException("Unexpected Logical Query Plan");
       }
+    } else { // if there remain search conditions
+
+      // check if it can be evaluated here
+      Set<EvalNode> matched = TUtil.newHashSet();
+      for (EvalNode eachEval : cnf) {
+        if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
+          matched.add(eachEval);
+        }
+      }
+
+      // if there are search conditions which can be evaluated here, push down them and remove them from cnf.
+      if (matched.size() > 0) {
+        selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
+        cnf.removeAll(matched);
+      }
     }
 
     return selNode;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 7512f23..7aa37bd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -529,6 +529,7 @@ public class ProjectionPushDownRule extends
     String joinQualReference = null;
     if (node.hasJoinQual()) {
       joinQualReference = newContext.addExpr(node.getJoinQual());
+      newContext.addNecessaryReferences(node.getJoinQual());
     }
 
     String [] referenceNames = null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 0f4a62b..71ab8f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -29,15 +29,14 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
 import org.apache.tajo.engine.planner.UniformRangePartition;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.master.TaskSchedulerContext;
@@ -80,10 +79,10 @@ public class Repartitioner {
 
     Path tablePath;
     FileFragment[] fragments = new FileFragment[2];
-    TableStats[] stats = new TableStats[2];
+    long[] stats = new long[2];
 
     // initialize variables from the child operators
-    for (int i =0; i < 2; i++) {
+    for (int i = 0; i < 2; i++) {
       TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
       if (tableDesc == null) { // if it is a real table stored on storage
         // TODO - to be fixed (wrong directory)
@@ -92,16 +91,22 @@ public class Repartitioner {
         childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
 
         tablePath = storageManager.getTablePath(scans[i].getTableName());
-        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
+        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
         fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
       } else {
         tablePath = tableDesc.getPath();
-        stats[i] = tableDesc.getStats();
+        try {
+          stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
+        } catch (PlanningException e) {
+          throw new IOException(e);
+        }
         fragments[i] = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(),
             tablePath).get(0);
       }
     }
 
+    LOG.info(String.format("Left Volume: %d, Right Volume: %d", stats[0], stats[1]));
+
     // Assigning either fragments or fetch urls to query units
     boolean leftSmall = execBlock.isBroadcastTable(scans[0].getCanonicalName());
     boolean rightSmall = execBlock.isBroadcastTable(scans[1].getCanonicalName());
@@ -111,16 +116,13 @@ public class Repartitioner {
       SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
       schedulerContext.setEstimatedTaskNum(1);
     } else if (leftSmall ^ rightSmall) {
-      LOG.info("[Distributed Join Strategy] : Broadcast Join");
       int broadcastIdx = leftSmall ? 0 : 1;
       int baseScanIdx = leftSmall ? 1 : 0;
-
-      LOG.info("Broadcasting Table Volume: " + stats[broadcastIdx].getNumBytes());
-      LOG.info("Base Table Volume: " + stats[baseScanIdx].getNumBytes());
-
+      LOG.info(String.format("[BRDCAST JOIN] base_table=%s, base_volume=%d",
+          scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
       scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments[broadcastIdx]);
     } else {
-      LOG.info("[Distributed Join Strategy] : Repartition Join");
+      LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
       // The hash map is modeling as follows:
       // <Part Id, <Table Name, Intermediate Data>>
       Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
@@ -151,22 +153,19 @@ public class Repartitioner {
         }
       }
 
-      LOG.info("Outer Intermediate Volume: " + stats[0].getNumBytes());
-      LOG.info("Inner Intermediate Volume: " + stats[1].getNumBytes());
-
       int [] avgSize = new int[2];
-      avgSize[0] = (int) (stats[0].getNumBytes() / hashEntries.size());
-      avgSize[1] = (int) (stats[1].getNumBytes() / hashEntries.size());
+      avgSize[0] = (int) (stats[0] / hashEntries.size());
+      avgSize[1] = (int) (stats[1] / hashEntries.size());
       int bothFetchSize = avgSize[0] + avgSize[1];
 
       // Getting the desire number of join tasks according to the volumn
       // of a larger table
-      int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
+      int largerIdx = stats[0] >= stats[1] ? 0 : 1;
       int desireJoinTaskVolumn = subQuery.getContext().getConf().
           getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
 
       // calculate the number of tasks according to the data size
-      int mb = (int) Math.ceil((double)stats[largerIdx].getNumBytes() / 1048576);
+      int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
       LOG.info("Larger intermediate data is approximately " + mb + " MB");
       // determine the number of task per 64MB
       int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
@@ -190,20 +189,39 @@ public class Repartitioner {
     }
   }
 
+  /**
+   * It creates a number of fragments for all partitions.
+   */
+  public static List<FileFragment> getFragmentsFromPartitionedTable(AbstractStorageManager sm,
+                                                                          ScanNode scan,
+                                                                          TableDesc table) throws IOException {
+    List<FileFragment> fragments = Lists.newArrayList();
+    PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+    for (Path path : partitionsScan.getInputPaths()) {
+      fragments.addAll(sm.getSplits(
+          scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
+    }
+    partitionsScan.setInputPaths(null);
+    return fragments;
+  }
+
   private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
                                                           int baseScanId, FileFragment broadcasted) throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
     ScanNode[] scans = execBlock.getScanNodes();
     Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
     TableMeta meta;
-    Path inputPath;
     ScanNode scan = scans[baseScanId];
     TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
-    inputPath = desc.getPath();
     meta = desc.getMeta();
 
-    List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
-        inputPath);
+    Collection<FileFragment> fragments;
+    if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+      fragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc);
+    } else {
+      fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+          desc.getPath());
+    }
 
     SubQuery.scheduleFragments(subQuery, fragments, broadcasted);
     schedulerContext.setEstimatedTaskNum(fragments.size());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 02c2f6a..83a593a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -44,7 +44,10 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -673,9 +676,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         Repartitioner.scheduleFragmentsForJoinQuery(subQuery.schedulerContext, subQuery);
       } else { // Case 3: Others (Sort or Aggregation)
         int numTasks = getNonLeafTaskNum(subQuery);
-//        ExecutionBlockId childId = masterPlan.getChilds(subQuery.getBlock()).get(0).getId();
-//        SubQuery child = subQuery.context.getSubQuery(childId);
-//        DataChannel channel = masterPlan.getChannel(child.getId(), subQuery.getId());
         Repartitioner.scheduleFragmentsForNonLeafTasks(subQuery.schedulerContext, masterPlan, subQuery, numTasks);
       }
     }
@@ -747,22 +747,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       subQuery.eventHandler.handle(event);
     }
 
-    /**
-     * It creates a number of fragments for all partitions.
-     */
-    private static Collection<FileFragment> getFragmentsFromPartitionedTable(SubQuery subQuery,
-                                                                      ScanNode scan,
-                                                                      TableDesc table) throws IOException {
-      List<FileFragment> fragments = Lists.newArrayList();
-      PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
-      for (Path path : partitionsScan.getInputPaths()) {
-        fragments.addAll(subQuery.getStorageManager().getSplits(
-            scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
-      }
-      partitionsScan.setInputPaths(null);
-      return fragments;
-    }
-
     private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOException {
       ExecutionBlock execBlock = subQuery.getBlock();
       ScanNode[] scans = execBlock.getScanNodes();
@@ -778,7 +762,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       // Otherwise, it creates at least one fragments for a table, which may
       // span a number of blocks or possibly consists of a number of files.
       if (scan.getType() == NodeType.PARTITIONS_SCAN) {
-        fragments = getFragmentsFromPartitionedTable(subQuery, scan, table);
+        fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table);
       } else {
         Path inputPath = table.getPath();
         fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
@@ -811,7 +795,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.getId(), fragment));
   }
 
-  public static void scheduleFragments(SubQuery subQuery, List<FileFragment> leftFragments,
+  public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments,
                                        FileFragment broadcastFragment) {
     for (FileFragment eachLeafFragment : leftFragments) {
       scheduleFragment(subQuery, eachLeafFragment, broadcastFragment);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index fa67eb6..ae59d11 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,10 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
@@ -53,7 +50,7 @@ public class LocalTajoTestingUtility {
 
   /**
    * for test
-   * @return
+   * @return The generated QueryId
    */
   public synchronized static QueryId newQueryId() {
     return QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0));
@@ -82,6 +79,15 @@ public class LocalTajoTestingUtility {
       Path dfsPath = new Path(tablePath, localPath.getName());
       fs.copyFromLocalFile(localPath, dfsPath);
       TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option);
+
+      // Enable this if you want to set pseudo stats. But, it will causes errors in some unit tests.
+      // So, you just use manually it for certain unit tests.
+//      TableStats stats = new TableStats();
+//      stats.setNumBytes(TPCH.tableVolumes.get(names[i]));
+//      TableDesc tableDesc = new TableDesc(names[i], schemas[i], meta, tablePath);
+//      tableDesc.setStats(stats);
+//      util.getMaster().getCatalog().addTable(tableDesc);
+
       client.createExternalTable(names[i], schemas[i], tablePath, meta);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 24a4237..e1a231a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -25,6 +25,7 @@ import org.apache.tajo.algebra.CreateTable;
 import org.apache.tajo.algebra.DropTable;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.OpType;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
@@ -92,7 +93,7 @@ import static org.junit.Assert.*;
  * <code>QueryTestCaseBase</code> basically provides the following methods:
  * <ul>
  *  <li><code>{@link #executeQuery()}</code> - executes a corresponding query and returns an ResultSet instance</li>
- *  <li><code>{@link #executeQuery(String)}</code> - executes a given query file included in the corresponding query
+ *  <li><code>{@link #executeFile(String)}</code> - executes a given query file included in the corresponding query
  *  file in the current class's query directory</li>
  *  <li><code>assertResultSet()</code> - check if the query result is equivalent to the expected result included
  *  in the corresponding result file in the current class's result directory.</li>
@@ -178,7 +179,7 @@ public class QueryTestCaseBase {
     currentDatasetPath = new Path(datasetBasePath, className);
   }
 
-  protected ResultSet execute(String sql) throws Exception {
+  protected ResultSet executeString(String sql) throws Exception {
     return testBase.execute(sql);
   }
 
@@ -189,7 +190,7 @@ public class QueryTestCaseBase {
    * @return ResultSet of query execution.
    */
   public ResultSet executeQuery() throws Exception {
-    return executeQuery(name.getMethodName() + ".sql");
+    return executeFile(name.getMethodName() + ".sql");
   }
 
   /**
@@ -199,7 +200,7 @@ public class QueryTestCaseBase {
    * @param queryFileName The file name to be used to execute a query.
    * @return ResultSet of query execution.
    */
-  public ResultSet executeQuery(String queryFileName) throws Exception {
+  public ResultSet executeFile(String queryFileName) throws Exception {
     Path queryFilePath = getQueryFilePath(queryFileName);
     FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
     assertTrue(queryFilePath.toString() + " existence check", fs.exists(queryFilePath));
@@ -312,6 +313,10 @@ public class QueryTestCaseBase {
     return StorageUtil.concatPath(currentDatasetPath, fileName);
   }
 
+  public String executeDDL(String ddlFileName, @Nullable String [] args) throws Exception {
+    return executeDDL(ddlFileName, null, true, args);
+  }
+
   /**
    *
    * Execute a data definition language (DDL) template. A general SQL DDL statement can be included in this file. But,
@@ -331,11 +336,14 @@ public class QueryTestCaseBase {
    * @param args A list of arguments, each of which is used to replace corresponding variable which has a form of ${i}.
    * @return The table name created
    */
-  public String executeDDL(String ddlFileName, String dataFileName, String ... args) throws Exception {
+  public String executeDDL(String ddlFileName, @Nullable String dataFileName, @Nullable String ... args)
+      throws Exception {
+
     return executeDDL(ddlFileName, dataFileName, true, args);
   }
 
-  private String executeDDL(String ddlFileName, String dataFileName, boolean isLocalTable, String ... args)
+  private String executeDDL(String ddlFileName, @Nullable String dataFileName, boolean isLocalTable,
+                            @Nullable String [] args)
       throws Exception {
 
     Path ddlFilePath = new Path(currentQueryPath, ddlFileName);
@@ -386,7 +394,7 @@ public class QueryTestCaseBase {
    * @param args The list argument to replace each corresponding format string ${i}. ${i} uses zero-based index.
    * @return A string compiled
    */
-  private String compileTemplate(String template, String dataFileName, String... args) {
+  private String compileTemplate(String template, @Nullable String dataFileName, @Nullable String ... args) {
     String result;
     if (dataFileName != null) {
       result = template.replace("${table.path}", "\'" + dataFileName + "'");
@@ -394,8 +402,10 @@ public class QueryTestCaseBase {
       result = template;
     }
 
-    for (int i = 0; i < args.length; i++) {
-      result = result.replace("${" + i + "}", args[i]);
+    if (args != null) {
+      for (int i = 0; i < args.length; i++) {
+        result = result.replace("${" + i + "}", args[i]);
+      }
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 6e73df8..9c96e0e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -251,7 +251,9 @@ public class TajoTestingCluster {
   }
 
   public void shutdownCatalogCluster() {
-    this.catalogServer.shutdown();
+    if (catalogServer != null) {
+      this.catalogServer.shutdown();
+    }
   }
 
   public MiniCatalogServer getMiniCatalogCluster() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
index ad3d676..cb1805d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -75,7 +75,8 @@ public class TpchTestBase {
     for (int i = 0; i < names.length; i++) {
       file = new File("src/test/tpch/" + names[i] + ".tbl");
       if(!file.exists()) {
-        file = new File(System.getProperty("user.dir") + "/tajo-core/tajo-core-backend/src/test/tpch/" + names[i] + ".tbl");
+        file = new File(System.getProperty("user.dir") + "/tajo-core/tajo-core-backend/src/test/tpch/" + names[i]
+            + ".tbl");
       }
       tables[i] = FileUtil.readTextFile(file).split("\n");
       paths[i] = file.getAbsolutePath();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 07f726d..4aa1ba2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -21,8 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.TpchTestBase;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -35,8 +34,9 @@ import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -48,12 +48,12 @@ import static org.junit.Assert.assertTrue;
 public class TestSortExec {
   private static TajoConf conf;
   private static final String TEST_PATH = "target/test-data/TestPhysicalPlanner";
+  private static TajoTestingCluster util;
   private static CatalogService catalog;
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
   private static AbstractStorageManager sm;
-  private static TajoTestingCluster util;
   private static Path workDir;
   private static Path tablePath;
   private static TableMeta employeeMeta;
@@ -63,8 +63,8 @@ public class TestSortExec {
   @BeforeClass
   public static void setUp() throws Exception {
     conf = new TajoConf();
-    util = new TajoTestingCluster();
-    catalog = util.startCatalogCluster().getCatalog();
+    util = TpchTestBase.getInstance().getTestingCluster();
+    catalog = util.getMaster().getCatalog();
     workDir = CommonTestingUtil.getTestDir(TEST_PATH);
     sm = StorageManagerFactory.getStorageManager(conf, workDir);
 
@@ -99,17 +99,12 @@ public class TestSortExec {
     optimizer = new LogicalOptimizer(conf);
   }
 
-  @After
-  public void tearDown() throws Exception {
-    util.shutdownCatalogCluster();
-  }
-
   public static String[] QUERIES = {
       "select managerId, empId, deptName from employee order by managerId, empId desc" };
 
   @Test
   public final void testNext() throws IOException, PlanningException {
-    FileFragment[] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility
         .newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
index ede0d58..9b940da 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -68,7 +68,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
     assertEquals(5, desc.getStats().getNumRows().intValue());
 
-    ResultSet res2 = executeQuery("check1.sql");
+    ResultSet res2 = executeFile("check1.sql");
 
     Map<Double, int []> resultRows1 = Maps.newHashMap();
     resultRows1.put(45.0d, new int[]{3, 2});
@@ -107,7 +107,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
     assertEquals(5, desc.getStats().getNumRows().intValue());
 
-    ResultSet res2 = executeQuery("check2.sql");
+    ResultSet res2 = executeFile("check2.sql");
 
     Map<Double, int []> resultRows1 = Maps.newHashMap();
     resultRows1.put(45.0d, new int[]{3, 2});
@@ -125,7 +125,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
 
   @Test
   public final void testCtasWithGroupby() throws Exception {
-    ResultSet res = executeQuery("CtasWithGroupby.sql");
+    ResultSet res = executeFile("CtasWithGroupby.sql");
     res.close();
 
     ResultSet res2 = executeQuery();
@@ -135,7 +135,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
 
   @Test
   public final void testCtasWithOrderby() throws Exception {
-    ResultSet res = executeQuery("CtasWithOrderby.sql");
+    ResultSet res = executeFile("CtasWithOrderby.sql");
     res.close();
 
     ResultSet res2 = executeQuery();
@@ -145,7 +145,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
 
   @Test
   public final void testCtasWithLimit() throws Exception {
-    ResultSet res = executeQuery("CtasWithLimit.sql");
+    ResultSet res = executeFile("CtasWithLimit.sql");
     res.close();
 
     ResultSet res2 = executeQuery();
@@ -155,7 +155,7 @@ public class TestCTASQuery extends QueryTestCaseBase {
 
   @Test
   public final void testCtasWithUnion() throws Exception {
-    ResultSet res = executeQuery("CtasWithUnion.sql");
+    ResultSet res = executeFile("CtasWithUnion.sql");
     res.close();
 
     ResultSet res2 = executeQuery();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
new file mode 100644
index 0000000..4bda517
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+
+public class TestJoinOnPartitionedTables extends QueryTestCaseBase {
+
+  @Test
+  public void testPartitionTableJoinSmallTable() throws Exception {
+    executeDDL("customer_ddl.sql", null);
+    ResultSet res = executeFile("insert_into_customer.sql");
+    res.close();
+
+    res = executeQuery();
+    assertResultSet(res);
+    res.close();
+
+    res = executeFile("selfJoinOfPartitionedTable.sql");
+    assertResultSet(res, "selfJoinOfPartitionedTable.result");
+    res.close();
+
+    res = executeFile("testNoProjectionJoinQual.sql");
+    assertResultSet(res, "testNoProjectionJoinQual.result");
+    res.close();
+
+    res = executeFile("testPartialFilterPushDown.sql");
+    assertResultSet(res, "testPartialFilterPushDown.result");
+    res.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 46c1d94..f775acb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -48,7 +48,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testCreateColumnPartitionedTable() throws Exception {
     String tableName ="testCreateColumnPartitionedTable";
-    ResultSet res = execute(
+    ResultSet res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
     res.close();
 
@@ -64,7 +64,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testCreateColumnPartitionedTableWithSelectedColumns() throws Exception {
     String tableName ="testCreateColumnPartitionedTableWithSelectedColumns";
-    ResultSet res = execute(
+    ResultSet res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
     res.close();
 
@@ -72,7 +72,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertEquals(3, catalog.getTableDesc(tableName).getSchema().getColumnNum());
     assertEquals(4, catalog.getTableDesc(tableName).getLogicalSchema().getColumnNum());
 
-    res = execute("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " +
+    res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " +
         "l_partkey, l_quantity from lineitem");
     res.close();
   }
@@ -80,20 +80,20 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testColumnPartitionedTableByOneColumn() throws Exception {
     String tableName ="testColumnPartitionedTableByOneColumn";
-    ResultSet res = execute(
+    ResultSet res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
     res.close();
 
     assertTrue(catalog.existsTable(tableName));
 
-    res = execute("insert overwrite into " + tableName
+    res = executeString("insert overwrite into " + tableName
         + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
 
     TableDesc desc = catalog.getTableDesc(tableName);
     assertPartitionDirectories(desc);
 
-    res = execute(
+    res = executeString(
         "select distinct * from " + tableName + " where (key = 45.0 or key = 38.0) and null_col is null");
 
     Map<Double, int []> resultRows1 = Maps.newHashMap();
@@ -123,13 +123,13 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testQueryCasesOnColumnPartitionedTable() throws Exception {
     String tableName ="testQueryCasesOnColumnPartitionedTable";
-    ResultSet res = execute(
+    ResultSet res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
     res.close();
 
     assertTrue(catalog.existsTable(tableName));
 
-    res = execute(
+    res = executeString(
         "insert overwrite into " + tableName
             + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
@@ -137,13 +137,17 @@ public class TestTablePartitions extends QueryTestCaseBase {
     TableDesc desc = catalog.getTableDesc(tableName);
     assertPartitionDirectories(desc);
 
-    res = executeQuery("case1.sql");
+    res = executeFile("case1.sql");
     assertResultSet(res, "case1.result");
     res.close();
 
-    res = executeQuery("case2.sql");
+    res = executeFile("case2.sql");
     assertResultSet(res, "case2.result");
     res.close();
+
+    res = executeFile("case3.sql");
+    assertResultSet(res, "case3.result");
+    res.close();
   }
 
   @Test
@@ -156,7 +160,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     CatalogService catalog = cluster.getMaster().getCatalog();
     assertTrue(catalog.existsTable(tableName));
 
-    res = execute("insert overwrite into " + tableName
+    res = executeString("insert overwrite into " + tableName
         + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
 
@@ -178,7 +182,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
     assertEquals(5, desc.getStats().getNumRows().intValue());
 
-    res = execute("select * from " + tableName + " where col2 = 2");
+    res = executeString("select * from " + tableName + " where col2 = 2");
 
     Map<Double, int []> resultRows1 = Maps.newHashMap();
     resultRows1.put(45.0d, new int[]{3, 2});
@@ -198,7 +202,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     resultRows2.put(45.0d, new int[]{3, 2});
     resultRows2.put(38.0d, new int[]{2, 2});
 
-    res = execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+    res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
 
     for (int i = 0; i < 3; i++) {
       assertTrue(res.next());
@@ -211,14 +215,14 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
     String tableName = "testColumnPartitionedTableByOneColumnsWithCompression";
-    ResultSet res = execute(
+    ResultSet res = executeString(
         "create table " + tableName + " (col2 int4, col3 float8) USING csv " +
             "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
             "PARTITION BY column(col1 int4)");
     res.close();
     assertTrue(catalog.existsTable(tableName));
 
-    res = execute(
+    res = executeString(
         "insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem");
     res.close();
     TableDesc desc = catalog.getTableDesc(tableName);
@@ -245,14 +249,14 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception {
     String tableName = "testColumnPartitionedTableByTwoColumnsWithCompression";
-    ResultSet res = execute("create table " + tableName + " (col3 float8, col4 text) USING csv " +
+    ResultSet res = executeString("create table " + tableName + " (col3 float8, col4 text) USING csv " +
         "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
         "PARTITION by column(col1 int4, col2 int4)");
     res.close();
 
     assertTrue(catalog.existsTable(tableName));
 
-    res = execute(
+    res = executeString(
         "insert overwrite into " + tableName +
             " select  l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
     res.close();
@@ -287,7 +291,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception {
     String tableName = "testColumnPartitionedTableByThreeColumnsWithCompression";
-    ResultSet res = execute(
+    ResultSet res = executeString(
         "create table " + tableName + " (col4 text) USING csv " +
             "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
             "partition by column(col1 int4, col2 int4, col3 float8)");
@@ -295,7 +299,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     assertTrue(catalog.existsTable(tableName));
 
-    res = execute(
+    res = executeString(
         "insert overwrite into " + tableName +
             " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
@@ -333,7 +337,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
       }
     }
 
-    res = execute("select * from " + tableName + " where col2 = 2");
+    res = executeString("select * from " + tableName + " where col2 = 2");
 
     Map<Double, int []> resultRows1 = Maps.newHashMap();
     resultRows1.put(45.0d, new int[]{3, 2});
@@ -353,7 +357,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     resultRows2.put(45.0d, new int[]{3, 2});
     resultRows2.put(38.0d, new int[]{2, 2});
 
-    res = execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+    res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
     i = 0;
     while(res.next()) {
       assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
@@ -368,7 +372,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
   @Test
   public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
     String tableName = "testColumnPartitionedTableNoMatchedPartition";
-    ResultSet res = execute(
+    ResultSet res = executeString(
         "create table " + tableName + " (col4 text) USING csv " +
             "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
             "partition by column(col1 int4, col2 int4, col3 float8)");
@@ -376,7 +380,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     assertTrue(catalog.existsTable(tableName));
 
-    res = execute(
+    res = executeString(
         "insert overwrite into " + tableName +
             " select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
@@ -414,7 +418,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
       }
     }
 
-    res = execute("select * from " + tableName + " where col2 = 9");
+    res = executeString("select * from " + tableName + " where col2 = 9");
     assertFalse(res.next());
     res.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
index 10fa423..0ebeaab 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested1.sql
@@ -1 +1,8 @@
-select l_orderkey + l_partkey as unique_key from lineitem group by l_orderkey + l_partkey;
\ No newline at end of file
+select
+  l_orderkey + l_partkey as unique_key
+from
+  lineitem
+group by
+  l_orderkey + l_partkey
+order by
+  unique_key;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
index e7bbcac..eb86ce0 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testGroupByNested2.sql
@@ -1 +1,10 @@
-select sum(l_orderkey) + sum(l_partkey) as total from lineitem group by l_orderkey + l_partkey;
\ No newline at end of file
+select * from (
+select
+  sum(l_orderkey) + sum(l_partkey) as total
+from
+  lineitem
+group by
+  l_orderkey + l_partkey
+) t1
+order by
+  total;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql
new file mode 100644
index 0000000..ca43710
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/customer_ddl.sql
@@ -0,0 +1,9 @@
+CREATE TABLE customer_parts (
+  c_custkey    INT4,
+  c_name    TEXT,
+  c_address    TEXT,
+  c_phone    TEXT,
+  c_acctbal    FLOAT8,
+  c_mktsegment    TEXT,
+  c_comment    TEXT
+) PARTITION BY COLUMN (c_nationkey INT4);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql
new file mode 100644
index 0000000..29152b6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/insert_into_customer.sql
@@ -0,0 +1,11 @@
+INSERT OVERWRITE INTO customer_parts
+  SELECT
+    c_custkey,
+    c_name,
+    c_address,
+    c_phone,
+    c_acctbal,
+    c_mktsegment,
+    c_comment,
+    c_nationkey
+  FROM customer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql
new file mode 100644
index 0000000..988c0e9
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.sql
@@ -0,0 +1,9 @@
+select
+  t1.c_nationkey,
+  t2.c_nationkey
+from
+  customer_parts t1, customer_parts t2
+where
+  t1.c_nationkey = t2.c_nationkey
+order by
+  t1.c_nationkey desc;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql
new file mode 100644
index 0000000..1db9c1c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testNoProjectionJoinQual.sql
@@ -0,0 +1 @@
+select count(*) from customer_parts t1, customer_parts t2 where t1.c_nationkey = t2.c_nationkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql
new file mode 100644
index 0000000..fc9063a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDown.sql
@@ -0,0 +1,9 @@
+select
+  upper(c_name) as c_name, count(1)
+from
+  customer_parts
+where
+  c_name is not null and
+  c_nationkey = 1
+group by
+  c_name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql
new file mode 100644
index 0000000..27e2066
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinOnPartitionedTables/testPartitionTableJoinSmallTable.sql
@@ -0,0 +1,11 @@
+select
+  c_custkey,
+  c_name,
+  c_nationkey,
+  n_nationkey
+from
+  customer_parts, nation
+where
+  c_nationkey = n_nationkey
+order by
+  c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
index 221b8a9..ccaa5fb 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testFullOuterJoin1.sql
@@ -1 +1,8 @@
-select c_custkey, orders.o_orderkey from orders full outer join customer on c_custkey = o_orderkey;
\ No newline at end of file
+select
+  c_custkey,
+  orders.o_orderkey
+from
+  orders full outer join customer on c_custkey = o_orderkey
+order by
+  c_custkey,
+  orders.o_orderkey;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
index 59a876a..f946e1d 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoin1.sql
@@ -1 +1,7 @@
-select c_custkey, orders.o_orderkey from customer left outer join orders on c_custkey = o_orderkey;
\ No newline at end of file
+select
+  c_custkey,
+  orders.o_orderkey
+from
+  customer left outer join orders on c_custkey = o_orderkey
+order by
+  c_custkey, o_orderkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
index 334c161..f5b0ba7 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr1.sql
@@ -1 +1,9 @@
-select c_custkey, orders.o_orderkey, 'val' as val from customer left outer join orders on c_custkey = o_orderkey;
\ No newline at end of file
+select
+  c_custkey,
+  orders.o_orderkey,
+  'val' as val
+from
+  customer left outer join orders on c_custkey = o_orderkey
+order by
+  c_custkey,
+  o_orderkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
index 3256e28..7333d54 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr2.sql
@@ -1 +1,9 @@
-select c_custkey, o.o_orderkey, 'val' as val from customer left outer join (select * from orders) o on c_custkey = o.o_orderkey
\ No newline at end of file
+select
+  c_custkey,
+  o.o_orderkey,
+  'val' as val
+from
+  customer left outer join (select * from orders) o on c_custkey = o.o_orderkey
+order by
+  c_custkey,
+  o_orderkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
index 03cdae2..90be13b 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testLeftOuterJoinWithConstantExpr3.sql
@@ -12,4 +12,6 @@ left outer join (
   group by
     c_custkey)
   b
-on a.c_custkey = b.c_custkey;
\ No newline at end of file
+on a.c_custkey = b.c_custkey
+order by
+  c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
index cf9896d..ba4c713 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testRightOuterJoin1.sql
@@ -1 +1,8 @@
-select c_custkey, orders.o_orderkey from orders right outer join customer on c_custkey = o_orderkey;
\ No newline at end of file
+select
+  c_custkey,
+  orders.o_orderkey
+from
+  orders right outer join customer on c_custkey = o_orderkey
+order by
+  c_custkey,
+  orders.o_orderkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql
new file mode 100644
index 0000000..6cb1ea1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestTablePartitions/case3.sql
@@ -0,0 +1,8 @@
+select
+  l.l_orderkey,
+  p.col1,
+  key
+  from lineitem as l, testQueryCasesOnColumnPartitionedTable as p
+where
+  (key = 45.0 or key = 38.0) and l.l_orderkey = p.col1;
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
index 09b14eb..b2d7dce 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/default/create_partitioned_table_as_select.sql
@@ -1,6 +1,17 @@
-CREATE TABLE sales ( col1 int, col2 int)
-PARTITION BY COLUMN (col3 int, col4 float, col5 text)
-AS SELECT col1, col2, col3, col4, col5 FROM sales_src
-   WHERE col1 > 16
+CREATE TABLE sales (
+  col1 int,
+  col2 int)
+PARTITION BY COLUMN (col3 int, col4 float, col5 text) AS
+
+SELECT
+  col1,
+  col2,
+  col3,
+  col4,
+  col5
+FROM
+  sales_src
+WHERE
+  col1 > 16
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
index 85c5598..788a084 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested1.result
@@ -2,5 +2,5 @@ unique_key
 -------------------------------
 2
 4
-6
-5
\ No newline at end of file
+5
+6
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
index edae882..8656add 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testGroupByNested2.result
@@ -2,5 +2,5 @@ total
 -------------------------------
 4
 4
-6
-5
\ No newline at end of file
+5
+6
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4179a7c9/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result
new file mode 100644
index 0000000..ea4a06b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestJoinOnPartitionedTables/selfJoinOfPartitionedTable.result
@@ -0,0 +1,7 @@
+c_nationkey,c_nationkey
+-------------------------------
+15,15
+13,13
+4,4
+3,3
+1,1
\ No newline at end of file