You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/17 10:23:34 UTC

[04/12] TAJO-501: Rewrite the projection part of logical planning.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index 74ef38a..9d66dd5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -24,45 +24,92 @@ import org.apache.tajo.engine.eval.AlgebraicUtil;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.NamedExprsManager;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.graph.SimpleUndirectedGraph;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 
 public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
+
+  private String [] guessRelationsFromJoinQual(LogicalPlan.QueryBlock block, EvalNode joinCondition)
+      throws PlanningException {
+
+    // Note that we can guarantee that each join qual used here is a singleton.
+    // This is because we use dissect a join qual into conjunctive normal forms.
+    // In other words, each join qual has a form 'col1 = col2'.
+    Column leftExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getLeftExpr()).get(0);
+    Column rightExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getRightExpr()).get(0);
+
+    // 0 - left table, 1 - right table
+    String [] relationNames = new String[2];
+
+    NamedExprsManager namedExprsMgr = block.getNamedExprsManager();
+    if (leftExpr.hasQualifier()) {
+      relationNames[0] = leftExpr.getQualifier();
+    } else {
+      if (namedExprsMgr.isAliasedName(leftExpr.getColumnName())) {
+        String columnName = namedExprsMgr.getOriginalName(leftExpr.getColumnName());
+        String [] parts = columnName.split("\\.");
+
+        if (parts.length != 2) {
+          throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+        }
+
+        relationNames[0] = parts[0];
+      } else {
+        throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+      }
+    }
+
+    if (rightExpr.hasQualifier()) {
+      relationNames[1] = rightExpr.getQualifier();
+    } else {
+      if (namedExprsMgr.isAliasedName(rightExpr.getColumnName())) {
+        String columnName = namedExprsMgr.getOriginalName(rightExpr.getColumnName());
+        String [] parts = columnName.split("\\.");
+        if (parts.length != 2) {
+          throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+        }
+        relationNames[1] = parts[0];
+      } else {
+        throw new PlanningException("Cannot expect a referenced relation: " + rightExpr);
+      }
+    }
+
+    return relationNames;
+  }
   public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
                                       JoinNode joinNode) throws PlanningException {
     Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
     Set<EvalNode> nonJoinQuals = Sets.newHashSet();
     for (EvalNode singleQual : cnf) {
       if (PlannerUtil.isJoinQual(singleQual)) {
-        List<Column> leftExpr = EvalTreeUtil.findAllColumnRefs(singleQual.getLeftExpr());
-        List<Column> rightExpr = EvalTreeUtil.findAllColumnRefs(singleQual.getRightExpr());
 
-        String leftExprRelation = leftExpr.get(0).getQualifier();
-        String rightExprRelName = rightExpr.get(0).getQualifier();
+        String [] relations = guessRelationsFromJoinQual(block, singleQual);
+        String leftExprRelName = relations[0];
+        String rightExprRelName = relations[1];
 
         Collection<String> leftLineage = PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild());
 
-        boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelation);
+        boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelName);
         JoinEdge edge;
-        edge = getEdge(leftExprRelation, rightExprRelName);
+        edge = getEdge(leftExprRelName, rightExprRelName);
 
         if (edge != null) {
           edge.addJoinQual(singleQual);
         } else {
           if (isLeftExprForLeftTable) {
             edge = new JoinEdge(joinNode.getJoinType(),
-                block.getRelation(leftExprRelation), block.getRelation(rightExprRelName), singleQual);
-            addEdge(leftExprRelation, rightExprRelName, edge);
+                block.getRelation(leftExprRelName), block.getRelation(rightExprRelName), singleQual);
+            addEdge(leftExprRelName, rightExprRelName, edge);
           } else {
             edge = new JoinEdge(joinNode.getJoinType(),
-                block.getRelation(rightExprRelName), block.getRelation(leftExprRelation), singleQual);
-            addEdge(rightExprRelName, leftExprRelation, edge);
+                block.getRelation(rightExprRelName), block.getRelation(leftExprRelName), singleQual);
+            addEdge(rightExprRelName, leftExprRelName, edge);
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index e0f5f4c..ed82cef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.collect.Sets;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.DatumFactory;
@@ -29,8 +28,11 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalType;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Set;
 
 public abstract class AggregationExec extends UnaryPhysicalExec {
@@ -43,24 +45,12 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
   protected EvalContext evalContexts [];
   protected Schema evalSchema;
 
-  protected EvalNode havingQual;
-  protected EvalContext havingContext;
-
   public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
                          PhysicalExec child) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
 
-    if (plan.hasHavingCondition()) {
-      this.havingQual = plan.getHavingCondition();
-      this.havingContext = plan.getHavingCondition().newContext();
-    }
-
-    if (plan.getHavingSchema() != null) {
-      this.evalSchema = plan.getHavingSchema();
-    } else {
-      this.evalSchema = plan.getOutSchema();
-    }
+    evalSchema = plan.getOutSchema();
 
     nonNullGroupingFields = Sets.newHashSet();
     // keylist will contain a list of IDs of grouping column
@@ -72,27 +62,25 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
       nonNullGroupingFields.add(col);
     }
 
-    // measureList will contain a list of IDs of measure fields
-    int valueIdx = 0;
-    measureList = new int[plan.getTargets().length - keylist.length];
-    if (measureList.length > 0) {
-      search: for (int inputIdx = 0; inputIdx < plan.getTargets().length; inputIdx++) {
-        for (int key : keylist) { // eliminate key field
-          if (plan.getTargets()[inputIdx].getColumnSchema().getColumnName()
-              .equals(inSchema.getColumn(key).getColumnName())) {
-            continue search;
-          }
-        }
-        measureList[valueIdx] = inputIdx;
-        valueIdx++;
+    // measureList will contain a list of measure field indexes against the target list.
+    List<Integer> measureIndexes = TUtil.newList();
+    for (int i = 0; i < plan.getTargets().length; i++) {
+      Target target = plan.getTargets()[i];
+      if (target.getEvalTree().getType() == EvalType.AGG_FUNCTION) {
+        measureIndexes.add(i);
       }
     }
 
+    measureList = new int[measureIndexes.size()];
+    for (int i = 0; i < measureIndexes.size(); i++) {
+      measureList[i] = measureIndexes.get(i);
+    }
+
     evals = new EvalNode[plan.getTargets().length];
     evalContexts = new EvalContext[plan.getTargets().length];
     for (int i = 0; i < plan.getTargets().length; i++) {
       Target t = plan.getTargets()[i];
-      if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getColumnSchema())) {
+      if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getNamedColumn())) {
         evals[i] = new ConstEval(DatumFactory.createNullDatum());
         evalContexts[i] = evals[i].newContext();
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index b2737b7..cf56200 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,13 +18,13 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 
@@ -35,46 +35,57 @@ import java.util.List;
 
 public class BNLJoinExec extends BinaryPhysicalExec {
   // from logical plan
-  private JoinNode plan;
-  private EvalNode joinQual;
-  private EvalContext qualCtx;
+  private final JoinNode plan;
+  private final boolean hasJoinQual;
+  private final EvalNode joinQual;
+  private final EvalContext qualCtx;
 
-  private final List<Tuple> outerTupleSlots;
-  private final List<Tuple> innerTupleSlots;
-  private Iterator<Tuple> outerIterator;
-  private Iterator<Tuple> innerIterator;
+  private final List<Tuple> leftTupleSlots;
+  private final List<Tuple> rightTupleSlots;
+  private Iterator<Tuple> leftIterator;
+  private Iterator<Tuple> rightIterator;
 
-  private boolean innerEnd;
-  private boolean outerEnd;
+  private boolean leftEnd;
+  private boolean rightEnd;
 
   // temporal tuples and states for nested loop join
   private FrameTuple frameTuple;
-  private Tuple outerTuple = null;
+  private Tuple leftTuple = null;
   private Tuple outputTuple = null;
-  private Tuple innext = null;
+  private Tuple rightNext = null;
 
   private final int TUPLE_SLOT_SIZE = 10000;
 
   // projection
-  private final int[] targetIds;
+  private final Projector projector;
+  private final EvalContext [] evalContexts;
 
   public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
-                     final PhysicalExec outer, PhysicalExec inner) {
-    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), plan.getOutSchema(), outer, inner);
+                     final PhysicalExec leftExec, PhysicalExec rightExec) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftExec, rightExec);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
     if (joinQual != null) { // if join type is not 'cross join'
+      hasJoinQual = true;
       this.qualCtx = this.joinQual.newContext();
+    } else {
+      hasJoinQual = false;
+      this.qualCtx = null;
     }
-    this.outerTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
-    this.innerTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
-    this.outerIterator = outerTupleSlots.iterator();
-    this.innerIterator = innerTupleSlots.iterator();
-    this.innerEnd = false;
-    this.outerEnd = false;
+    this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+    this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+    this.leftIterator = leftTupleSlots.iterator();
+    this.rightIterator = rightTupleSlots.iterator();
+    this.rightEnd = false;
+    this.leftEnd = false;
 
     // for projection
-    targetIds = RowStoreUtil.getTargetIds(inSchema, outSchema);
+    if (!plan.hasTargets()) {
+      plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
+    }
+
+    projector = new Projector(inSchema, outSchema, plan.getTargets());
+    evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -87,106 +98,108 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
   public Tuple next() throws IOException {
 
-    if (outerTupleSlots.isEmpty()) {
+    if (leftTupleSlots.isEmpty()) {
       for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
         Tuple t = leftChild.next();
         if (t == null) {
-          outerEnd = true;
+          leftEnd = true;
           break;
         }
-        outerTupleSlots.add(t);
+        leftTupleSlots.add(t);
       }
-      outerIterator = outerTupleSlots.iterator();
-      outerTuple = outerIterator.next();
+      leftIterator = leftTupleSlots.iterator();
+      leftTuple = leftIterator.next();
     }
 
-    if (innerTupleSlots.isEmpty()) {
+    if (rightTupleSlots.isEmpty()) {
       for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
         Tuple t = rightChild.next();
         if (t == null) {
-          innerEnd = true;
+          rightEnd = true;
           break;
         }
-        innerTupleSlots.add(t);
+        rightTupleSlots.add(t);
       }
-      innerIterator = innerTupleSlots.iterator();
+      rightIterator = rightTupleSlots.iterator();
     }
 
-    if((innext = rightChild.next()) == null){
-      innerEnd = true;
+    if((rightNext = rightChild.next()) == null){
+      rightEnd = true;
     }
 
     while (true) {
-      if (!innerIterator.hasNext()) { // if inneriterator ended
-        if (outerIterator.hasNext()) { // if outertupleslot remains
-          outerTuple = outerIterator.next();
-          innerIterator = innerTupleSlots.iterator();
+      if (!rightIterator.hasNext()) { // if leftIterator ended
+        if (leftIterator.hasNext()) { // if rightTupleslot remains
+          leftTuple = leftIterator.next();
+          rightIterator = rightTupleSlots.iterator();
         } else {
-          if (innerEnd) {
+          if (rightEnd) {
             rightChild.rescan();
-            innerEnd = false;
+            rightEnd = false;
             
-            if (outerEnd) {
+            if (leftEnd) {
               return null;
             }
-            outerTupleSlots.clear();
+            leftTupleSlots.clear();
             for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
               Tuple t = leftChild.next();
               if (t == null) {
-                outerEnd = true;
+                leftEnd = true;
                 break;
               }
-              outerTupleSlots.add(t);
+              leftTupleSlots.add(t);
             }
-            if (outerTupleSlots.isEmpty()) {
+            if (leftTupleSlots.isEmpty()) {
               return null;
             }
-            outerIterator = outerTupleSlots.iterator();
-            outerTuple = outerIterator.next();
+            leftIterator = leftTupleSlots.iterator();
+            leftTuple = leftIterator.next();
             
           } else {
-            outerIterator = outerTupleSlots.iterator();
-            outerTuple = outerIterator.next();
+            leftIterator = leftTupleSlots.iterator();
+            leftTuple = leftIterator.next();
           }
           
-          innerTupleSlots.clear();
-          if (innext != null) {
-            innerTupleSlots.add(innext);
-            for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill inner
+          rightTupleSlots.clear();
+          if (rightNext != null) {
+            rightTupleSlots.add(rightNext);
+            for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill right
               Tuple t = rightChild.next();
               if (t == null) {
-                innerEnd = true;
+                rightEnd = true;
                 break;
               }
-              innerTupleSlots.add(t);
+              rightTupleSlots.add(t);
             }
           } else {
-            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill inner
+            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill right
               Tuple t = rightChild.next();
               if (t == null) {
-                innerEnd = true;
+                rightEnd = true;
                 break;
               }
-              innerTupleSlots.add(t);
+              rightTupleSlots.add(t);
             }
           }
           
-          if ((innext = rightChild.next()) == null) {
-            innerEnd = true;
+          if ((rightNext = rightChild.next()) == null) {
+            rightEnd = true;
           }
-          innerIterator = innerTupleSlots.iterator();
+          rightIterator = rightTupleSlots.iterator();
         }
       }
 
-      frameTuple.set(outerTuple, innerIterator.next());
-      if (joinQual != null) {
+      frameTuple.set(leftTuple, rightIterator.next());
+      if (hasJoinQual) {
         joinQual.eval(qualCtx, inSchema, frameTuple);
         if (joinQual.terminate(qualCtx).asBool()) {
-          RowStoreUtil.project(frameTuple, outputTuple, targetIds);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outputTuple);
           return outputTuple;
         }
       } else {
-        RowStoreUtil.project(frameTuple, outputTuple, targetIds);
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outputTuple);
         return outputTuple;
       }
     }
@@ -195,10 +208,10 @@ public class BNLJoinExec extends BinaryPhysicalExec {
   @Override
   public void rescan() throws IOException {
     super.rescan();
-    innerEnd = false;
-    innerTupleSlots.clear();
-    outerTupleSlots.clear();
-    innerIterator = innerTupleSlots.iterator();
-    outerIterator = outerTupleSlots.iterator();
+    rightEnd = false;
+    rightTupleSlots.clear();
+    leftTupleSlots.clear();
+    rightIterator = rightTupleSlots.iterator();
+    leftIterator = leftTupleSlots.iterator();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 5de6634..30c1fa7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -65,7 +65,7 @@ public class BSTIndexScanExec extends PhysicalExec {
         scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
     this.fileScanner.init();
     this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     this.reader = new BSTIndex(sm.getFileSystem().getConf()).
         getIndexReader(fileName, keySchema, comparator);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index 4723ecc..67d6baa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -22,37 +22,72 @@ import java.util.Stack;
 
 public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {
 
-  @Override
-  public RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visit(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
       throws PhysicalPlanningException {
 
-    if (exec instanceof SeqScanExec) {
-      return visitSeqScan((SeqScanExec) exec, stack, context);
-    } else if (exec instanceof SelectionExec) {
-      return visitSelection((SelectionExec) exec, stack, context);
-    } else if (exec instanceof SortExec) {
-      return visitSort((SortExec) exec, stack, context);
-    } else if (exec instanceof SortAggregateExec) {
-      return visitSortAggregation((SortAggregateExec) exec, stack, context);
-    } else if (exec instanceof ProjectionExec) {
-      return visitProjection((ProjectionExec) exec, stack, context);
+    // Please keep all physical executors except for abstract class.
+    // They should be ordered in an lexicography order of their names for easy code maintenance.
+    if (exec instanceof BNLJoinExec) {
+      return visitBNLJoin(context, (BNLJoinExec) exec, stack);
+    } else if (exec instanceof BSTIndexScanExec) {
+      return visitBSTIndexScan(context, (BSTIndexScanExec) exec, stack);
+    } else if (exec instanceof ColumnPartitionedTableStoreExec) {
+      return visitColumnPartitionedTableStore(context, (ColumnPartitionedTableStoreExec) exec, stack);
+    } else if (exec instanceof EvalExprExec) {
+      return visitEvalExpr(context, (EvalExprExec) exec, stack);
+    } else if (exec instanceof ExternalSortExec) {
+      return visitExternalSort(context, (ExternalSortExec) exec, stack);
+    } else if (exec instanceof HashAggregateExec) {
+      return visitHashAggregate(context, (HashAggregateExec) exec, stack);
+    } else if (exec instanceof HashFullOuterJoinExec) {
+      return visitHashFullOuterJoin(context, (HashFullOuterJoinExec) exec, stack);
     } else if (exec instanceof HashJoinExec) {
-      return visitHashJoin((HashJoinExec) exec, stack, context);
+      return visitHashJoin(context, (HashJoinExec) exec, stack);
     } else if (exec instanceof HashLeftAntiJoinExec) {
-      return visitHashAntiJoin((HashLeftAntiJoinExec) exec, stack, context);
+      return visitHashLeftAntiJoin(context, (HashLeftAntiJoinExec) exec, stack);
+    } else if (exec instanceof HashLeftOuterJoinExec) {
+      return visitHashLeftOuterJoin(context, (HashLeftOuterJoinExec) exec, stack);
     } else if (exec instanceof HashLeftSemiJoinExec) {
-      return visitHashSemiJoin((HashLeftSemiJoinExec) exec, stack, context);
+      return visitLeftHashSemiJoin(context, (HashLeftSemiJoinExec) exec, stack);
+    } else if (exec instanceof HashShuffleFileWriteExec) {
+      return visitHashShuffleFileWrite(context, (HashShuffleFileWriteExec) exec, stack);
+    } else if (exec instanceof HavingExec) {
+      return visitHaving(context, (HavingExec) exec, stack);
     } else if (exec instanceof LimitExec) {
-      return visitLimit((LimitExec) exec, stack, context);
-    } else {
-      throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
+      return visitLimit(context, (LimitExec) exec, stack);
+    } else if (exec instanceof MemSortExec) {
+      return visitMemSort(context, (MemSortExec) exec, stack);
+    } else if (exec instanceof MergeFullOuterJoinExec) {
+      return visitMergeFullOuterJoin(context, (MergeFullOuterJoinExec) exec, stack);
+    } else if (exec instanceof MergeJoinExec) {
+      return visitMergeJoin(context, (MergeJoinExec) exec, stack);
+    } else if (exec instanceof NLJoinExec) {
+      return visitNLJoin(context, (NLJoinExec) exec, stack);
+    } else if (exec instanceof NLLeftOuterJoinExec) {
+      return visitNLLeftOuterJoin(context, (NLLeftOuterJoinExec) exec, stack);
+    } else if (exec instanceof ProjectionExec) {
+      return visitProjection(context, (ProjectionExec) exec, stack);
+    } else if (exec instanceof RangeShuffleFileWriteExec) {
+      return visitRangeShuffleFileWrite(context, (RangeShuffleFileWriteExec) exec, stack);
+    } else if (exec instanceof RightOuterMergeJoinExec) {
+      return visitRightOuterMergeJoin(context, (RightOuterMergeJoinExec) exec, stack);
+    } else if (exec instanceof SelectionExec) {
+      return visitSelection(context, (SelectionExec) exec, stack);
+    } else if (exec instanceof SeqScanExec) {
+      return visitSeqScan(context, (SeqScanExec) exec, stack);
+    } else if (exec instanceof SortAggregateExec) {
+      return visitSortAggregate(context, (SortAggregateExec) exec, stack);
+    } else if (exec instanceof StoreTableExec) {
+      return visitStoreTable(context, (StoreTableExec) exec, stack);
     }
+
+    throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
   }
 
   private RESULT visitUnaryExecutor(UnaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
       throws PhysicalPlanningException {
     stack.push(exec);
-    RESULT r = visitChild(exec.getChild(), stack, context);
+    RESULT r = visit(exec.getChild(), stack, context);
     stack.pop();
     return r;
   }
@@ -60,68 +95,162 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
   private RESULT visitBinaryExecutor(BinaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
       throws PhysicalPlanningException {
     stack.push(exec);
-    RESULT r = visitChild(exec.getLeftChild(), stack, context);
-    visitChild(exec.getRightChild(), stack, context);
+    RESULT r = visit(exec.getLeftChild(), stack, context);
+    visit(exec.getRightChild(), stack, context);
     stack.pop();
     return r;
   }
 
   @Override
-  public RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
-    return visitUnaryExecutor(exec, stack, context);
+    return null;
+  }
+
+  @Override
+  public RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
   }
 
   @Override
-  public RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context) {
+  public RESULT visitColumnPartitionedTableStore(CONTEXT context, ColumnPartitionedTableStoreExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException {
     return null;
   }
 
   @Override
-  public RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
-    return visitUnaryExecutor(exec, stack, context);
+    return null;
   }
 
   @Override
-  public RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
-    return visitBinaryExecutor(exec, stack, context);
+    return null;
   }
 
   @Override
-  public RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
-    return visitUnaryExecutor(exec, stack, context);
+    return null;
   }
 
   @Override
-  public RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
-    return visitUnaryExecutor(exec, stack, context);
+    return null;
   }
 
   @Override
-  public RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
     return visitBinaryExecutor(exec, stack, context);
   }
 
   @Override
-  public RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
     return visitBinaryExecutor(exec, stack, context);
   }
 
   @Override
-  public RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack) throws
+      PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
     return visitBinaryExecutor(exec, stack, context);
   }
 
   @Override
-  public RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) throws
+      PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
     return visitUnaryExecutor(exec, stack, context);
   }
+
+  @Override
+  public RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack) {
+    return null;
+  }
+
+  @Override
+  public RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index c1f84a3..4480747 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -35,9 +35,9 @@ public class EvalExprExec extends PhysicalExec {
     super(context, plan.getInSchema(), plan.getOutSchema());
     this.plan = plan;
 
-    evalContexts = new EvalContext[plan.getExprs().length];
-    for (int i = 0; i < plan.getExprs().length; i++) {
-      evalContexts[i] = plan.getExprs()[i].getEvalTree().newContext();
+    evalContexts = new EvalContext[plan.getTargets().length];
+    for (int i = 0; i < plan.getTargets().length; i++) {
+      evalContexts[i] = plan.getTargets()[i].getEvalTree().newContext();
     }
   }
 
@@ -50,7 +50,7 @@ public class EvalExprExec extends PhysicalExec {
   */
   @Override
   public Tuple next() throws IOException {    
-    Target [] targets = plan.getExprs();
+    Target [] targets = plan.getTargets();
     Tuple t = new VTuple(targets.length);
     for (int i = 0; i < targets.length; i++) {
       targets[i].getEvalTree().eval(evalContexts[i], inSchema, null);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index 681e340..ed2f275 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,11 +18,11 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -86,29 +86,15 @@ public class HashAggregateExec extends AggregationExec {
     }
 
     EvalContext [] ctx;
-    if (havingQual == null) {
-      if (iterator.hasNext()) {
-      ctx =  iterator.next().getValue();
 
+    if (iterator.hasNext()) {
+      ctx =  iterator.next().getValue();
       for (int i = 0; i < ctx.length; i++) {
         tuple.put(i, evals[i].terminate(ctx[i]));
       }
 
       return tuple;
-      } else {
-        return null;
-      }
     } else {
-      while(iterator.hasNext()) {
-        ctx =  iterator.next().getValue();
-        for (int i = 0; i < ctx.length; i++) {
-          tuple.put(i, evals[i].terminate(ctx[i]));
-        }
-        havingQual.eval(havingContext, evalSchema, tuple);
-        if (havingQual.terminate(havingContext).asBool()) {
-          return tuple;
-        }
-      }
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 550f0ee..caea5d9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -95,7 +95,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 8a42cff..e08e07d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -60,32 +60,32 @@ public class HashJoinExec extends BinaryPhysicalExec {
   protected final Projector projector;
   protected final EvalContext [] evalContexts;
 
-  public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
-      PhysicalExec inner) {
-    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
-        plan.getOutSchema(), outer, inner);
+  public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
+      PhysicalExec rightExec) {
+    super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(),
+        leftExec, rightExec);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
     this.qualCtx = joinQual.newContext();
     this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
 
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
-        outer.getSchema(), inner.getSchema());
+        leftExec.getSchema(), rightExec.getSchema());
 
     leftKeyList = new int[joinKeyPairs.size()];
     rightKeyList = new int[joinKeyPairs.size()];
 
     for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+      leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
     }
 
     for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+      rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
     }
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index f0022cb..841ff5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -93,7 +93,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
new file mode 100644
index 0000000..41ff7bf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.HavingNode;
+import org.apache.tajo.engine.planner.logical.SelectionNode;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class HavingExec extends UnaryPhysicalExec  {
+  private final EvalNode qual;
+  private final EvalContext qualCtx;
+  private final Tuple outputTuple;
+
+  public HavingExec(TaskAttemptContext context,
+                    HavingNode plan,
+                    PhysicalExec child) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+
+    this.qual = plan.getQual();
+    this.qualCtx = this.qual.newContext();
+    this.outputTuple = new VTuple(outSchema.getColumnNum());
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    while ((tuple = child.next()) != null) {
+      qual.eval(qualCtx, inSchema, tuple);
+      if (qual.terminate(qualCtx).asBool()) {
+          return tuple;
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index e1fe440..446755d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -93,7 +93,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index ebddd07..e128fea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -89,7 +89,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
     
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 67908da..055c66e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -59,7 +59,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
 
     // for projection
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.renew();
+    evalContexts = projector.newContexts();
 
     // for join
     needNewOuter = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index ebc78e2..305cdd2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -62,7 +62,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.renew();
+    evalContexts = projector.newContexts();
 
     // for join
     needNextRightTuple = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index f0abef5..9ede15d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -21,15 +21,82 @@ package org.apache.tajo.engine.planner.physical;
 import java.util.Stack;
 
 public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
-  RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context);
-  RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+
+  RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitColumnPartitionedTableStore(CONTEXT context, ColumnPartitionedTableStoreExec exec,
+                                          Stack<PhysicalExec> stack) throws PhysicalPlanningException;
+
+  RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index bd773ed..fdd1839 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -23,17 +23,16 @@ import java.util.Stack;
 public class PhysicalPlanUtil {
   public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
       throws PhysicalPlanningException {
-    return (T) new FindVisitor().visitChild(plan, new Stack<PhysicalExec>(), clazz);
+    return (T) new FindVisitor().visit(plan, new Stack<PhysicalExec>(), clazz);
   }
 
   private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
-    public PhysicalExec visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
+    public PhysicalExec visit(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
         throws PhysicalPlanningException {
-
       if (target.isAssignableFrom(exec.getClass())) {
         return exec;
       } else {
-        return super.visitChild(exec, stack, target);
+        return super.visit(exec, stack, target);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index f8ec801..c57089e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -21,24 +21,24 @@
  */
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.planner.logical.Projectable;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.ProjectionNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 
 import java.io.IOException;
 
 public class ProjectionExec extends UnaryPhysicalExec {
-  private final ProjectionNode plan;
+  private final Projectable plan;
 
   // for projection
   private Tuple outTuple;
   private EvalContext[] evalContexts;
   private Projector projector;
   
-  public ProjectionExec(TaskAttemptContext context, ProjectionNode plan,
+  public ProjectionExec(TaskAttemptContext context, Projectable plan,
       PhysicalExec child) {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
@@ -49,7 +49,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
 
     this.outTuple = new VTuple(outSchema.getColumnNum());
     this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 8c99215..2f1d33d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -91,7 +91,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 0e6640c..5158dc0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,36 +18,24 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.SelectionNode;
-import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 public class SelectionExec extends UnaryPhysicalExec  {
   private final EvalNode qual;
   private final EvalContext qualCtx;
-  private final Tuple outputTuple;
-  // projection
-  private int [] targetIds;
 
   public SelectionExec(TaskAttemptContext context,
                        SelectionNode plan,
                        PhysicalExec child) {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
-
     this.qual = plan.getQual();
     this.qualCtx = this.qual.newContext();
-    // for projection
-    if (!inSchema.equals(outSchema)) {
-      targetIds = RowStoreUtil.getTargetIds(inSchema, outSchema);
-    }
-
-    this.outputTuple = new VTuple(outSchema.getColumnNum());
   }
 
   @Override
@@ -55,13 +43,8 @@ public class SelectionExec extends UnaryPhysicalExec  {
     Tuple tuple;
     while ((tuple = child.next()) != null) {
       qual.eval(qualCtx, inSchema, tuple);
-      if (qual.terminate(qualCtx).asBool()) {
-        if (targetIds != null) {
-          RowStoreUtil.project(tuple, outputTuple, targetIds);
-          return outputTuple;
-        } else {
+      if (qual.terminate(qualCtx).isTrue()) {
           return tuple;
-        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 3de75a9..6d3d6b1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -151,7 +151,7 @@ public class SeqScanExec extends PhysicalExec {
     }
 
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
+    this.evalContexts = projector.newContexts();
 
     if (fragments.length > 1) {
       this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 44af891..ddcdd03 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -43,7 +43,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
   @Override
   public boolean isEligible(LogicalPlan plan) {
     for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
-      if (block.hasSelectionNode() || block.hasJoinNode()) {
+      if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 4a7a324..72ff67c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -20,8 +20,8 @@ package org.apache.tajo.engine.planner.rewrite;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -359,7 +359,8 @@ public class PartitionedTableRewriter implements RewriteRule {
       try {
         Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
         plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
-        PartitionedTableScanNode rewrittenScanNode = new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
+        PartitionedTableScanNode rewrittenScanNode =
+            new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
         updateTableStat(rewrittenScanNode);
         PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
       } catch (IOException e) {