You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:53 UTC

[31/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
new file mode 100644
index 0000000..ff1f7b3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode joinNode;
+  private EvalNode joinQual;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private Tuple leftNext = null;
+
+  private List<Tuple> leftTupleSlots;
+  private List<Tuple> rightTupleSlots;
+
+  private JoinTupleComparator joincomparator = null;
+  private TupleComparator[] tupleComparator = null;
+
+  private final static int INITIAL_TUPLE_SLOT = 10000;
+
+  private boolean end = false;
+
+  // projection
+  private Projector projector;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private int posRightTupleSlots = -1;
+  private int posLeftTupleSlots = -1;
+  boolean endInPopulationStage = false;
+  private boolean initRightDone = false;
+
+  public MergeFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                                PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+        "but there is no join condition");
+    this.joinNode = plan;
+    this.joinQual = plan.getJoinQual();
+
+    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    SortSpec[][] sortSpecs = new SortSpec[2][];
+    sortSpecs[0] = leftSortKey;
+    sortSpecs[1] = rightSortKey;
+
+    this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
+        rightChild.getSchema(), sortSpecs);
+    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+        plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+
+    leftNumCols = leftChild.getSchema().size();
+    rightNumCols = rightChild.getSchema().size();
+  }
+
+  public JoinNode getPlan(){
+    return this.joinNode;
+  }
+
+  public Tuple next() throws IOException {
+    Tuple previous;
+
+    for (;;) {
+      boolean newRound = false;
+      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+        newRound = true;
+      }
+      if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+        newRound = true;
+      }
+
+      if(newRound == true){
+
+        if (end) {
+
+          ////////////////////////////////////////////////////////////////////////
+          // FINALIZING STAGE
+          ////////////////////////////////////////////////////////////////////////
+          // the finalizing stage, where remaining tuples on the right are
+          // transformed into left-padded results while tuples on the left
+          // are transformed into right-padded results
+
+          // before exit, a left-padded tuple should be built for all remaining
+          // right side and a right-padded tuple should be built for all remaining
+          // left side
+
+          if (initRightDone == false) {
+            // maybe the left operand was empty => the right one didn't have the chance to initialize
+            rightTuple = rightChild.next();
+            initRightDone = true;
+          }
+
+          if((leftTuple == null) && (rightTuple == null)) {
+            return null;
+          }
+
+          if((leftTuple == null) && (rightTuple != null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(frameTuple, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            rightTuple = rightChild.next();
+            return outTuple;
+          }
+
+          if((leftTuple != null) && (rightTuple == null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(frameTuple, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            leftTuple = leftChild.next();
+            return outTuple;
+          }
+        } // if end
+
+        ////////////////////////////////////////////////////////////////////////
+        // INITIALIZING STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // initializing stage, reading the first tuple on each side
+        if (leftTuple == null) {
+          leftTuple = leftChild.next();
+          if( leftTuple == null){
+            end = true;
+            continue;
+          }
+        }
+        if (rightTuple == null) {
+          rightTuple = rightChild.next();
+          initRightDone = true;
+          if (rightTuple == null) {
+            end = true;
+            continue;
+          }
+        }
+
+        // reset tuple slots for a new round
+        leftTupleSlots.clear();
+        rightTupleSlots.clear();
+        posRightTupleSlots = -1;
+        posLeftTupleSlots = -1;
+
+        ////////////////////////////////////////////////////////////////////////
+        // Comparison and Move Forward Stage
+        ////////////////////////////////////////////////////////////////////////
+        // advance alternatively on each side until a match is found
+        int cmp;
+        while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
+
+          if (cmp > 0) {
+
+            //before getting a new tuple from the right,  a leftnullpadded tuple should be built
+            //output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(frameTuple, outTuple);
+            // BEFORE RETURN, MOVE FORWARD
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } else if (cmp < 0) {
+            // before getting a new tuple from the left,  a rightnullpadded tuple should be built
+            // output a tuple with the nulls padded rightTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(frameTuple, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            // BEFORE RETURN, MOVE FORWARD
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } // if (cmp < 0)
+        } //while
+
+
+        ////////////////////////////////////////////////////////////////////////
+        // SLOTS POPULATION STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // once a match is found, retain all tuples with this key in tuple slots
+        // on each side
+        if(!end) {
+          endInPopulationStage = false;
+
+          boolean endLeft = false;
+          boolean endRight = false;
+
+          previous = new VTuple(leftTuple);
+          do {
+            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              endLeft = true;
+            }
+
+
+          } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          posLeftTupleSlots = 0;
+
+
+          previous = new VTuple(rightTuple);
+          do {
+            rightTupleSlots.add(new VTuple(rightTuple));
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              endRight = true;
+            }
+
+          } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          posRightTupleSlots = 0;
+
+          if ((endLeft == true) || (endRight == true)) {
+            end = true;
+            endInPopulationStage = true;
+          }
+
+        } // if end false
+      } // if newRound
+
+
+      ////////////////////////////////////////////////////////////////////////
+      // RESULTS STAGE
+      ////////////////////////////////////////////////////////////////////////
+      // now output result matching tuples from the slots
+      // if either we haven't reached end on neither side, or we did reach end
+      // on one(or both) sides but that happened in the slots population step
+      // (i.e. refers to next round)
+      if(!end || (end && endInPopulationStage)){
+        if(posLeftTupleSlots == 0){
+          leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          posLeftTupleSlots = posLeftTupleSlots + 1;
+        }
+
+        if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
+          Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+          posRightTupleSlots = posRightTupleSlots + 1;
+          frameTuple.set(leftNext, aTuple);
+          joinQual.eval(inSchema, frameTuple);
+          projector.eval(frameTuple, outTuple);
+          return outTuple;
+        } else {
+          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+          if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
+            //rewind the right slots position
+            posRightTupleSlots = 0;
+            Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+            posRightTupleSlots = posRightTupleSlots + 1;
+            leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            posLeftTupleSlots = posLeftTupleSlots + 1;
+
+            frameTuple.set(leftNext, aTuple);
+            joinQual.eval(inSchema, frameTuple);
+            projector.eval(frameTuple, outTuple);
+            return outTuple;
+          }
+        }
+      } // the second if end false
+    } // for
+  }
+
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    leftTupleSlots.clear();
+    rightTupleSlots.clear();
+    posRightTupleSlots = -1;
+    posLeftTupleSlots = -1;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    leftTupleSlots.clear();
+    rightTupleSlots.clear();
+    leftTupleSlots = null;
+    rightTupleSlots = null;
+    joinNode = null;
+    joinQual = null;
+    projector = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
new file mode 100644
index 0000000..470e1c9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode joinNode;
+  private EvalNode joinQual;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple outerTuple = null;
+  private Tuple innerTuple = null;
+  private Tuple outTuple = null;
+  private Tuple outerNext = null;
+
+  private List<Tuple> outerTupleSlots;
+  private List<Tuple> innerTupleSlots;
+  private Iterator<Tuple> outerIterator;
+  private Iterator<Tuple> innerIterator;
+
+  private JoinTupleComparator joincomparator = null;
+  private TupleComparator[] tupleComparator = null;
+
+  private final static int INITIAL_TUPLE_SLOT = 10000;
+
+  private boolean end = false;
+
+  // projection
+  private Projector projector;
+
+  public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+      PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+        "but there is no join condition");
+    this.joinNode = plan;
+    this.joinQual = plan.getJoinQual();
+
+    this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    SortSpec[][] sortSpecs = new SortSpec[2][];
+    sortSpecs[0] = outerSortKey;
+    sortSpecs[1] = innerSortKey;
+
+    this.joincomparator = new JoinTupleComparator(outer.getSchema(),
+        inner.getSchema(), sortSpecs);
+    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+        plan.getJoinQual(), outer.getSchema(), inner.getSchema());
+    this.outerIterator = outerTupleSlots.iterator();
+    this.innerIterator = innerTupleSlots.iterator();
+    
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+  }
+
+  public JoinNode getPlan(){
+    return this.joinNode;
+  }
+
+  public Tuple next() throws IOException {
+    Tuple previous;
+
+    for (;;) {
+      if (!outerIterator.hasNext() && !innerIterator.hasNext()) {
+        if(end){
+          return null;
+        }
+
+        if(outerTuple == null){
+          outerTuple = leftChild.next();
+        }
+        if(innerTuple == null){
+          innerTuple = rightChild.next();
+        }
+
+        outerTupleSlots.clear();
+        innerTupleSlots.clear();
+
+        int cmp;
+        while ((cmp = joincomparator.compare(outerTuple, innerTuple)) != 0) {
+          if (cmp > 0) {
+            innerTuple = rightChild.next();
+          } else if (cmp < 0) {
+            outerTuple = leftChild.next();
+          }
+          if (innerTuple == null || outerTuple == null) {
+            return null;
+          }
+        }
+
+        try {
+          previous = outerTuple.clone();
+          do {
+            outerTupleSlots.add(outerTuple.clone());
+            outerTuple = leftChild.next();
+            if (outerTuple == null) {
+              end = true;
+              break;
+            }
+          } while (tupleComparator[0].compare(previous, outerTuple) == 0);
+          outerIterator = outerTupleSlots.iterator();
+          outerNext = outerIterator.next();
+
+          previous = innerTuple.clone();
+          do {
+            innerTupleSlots.add(innerTuple.clone());
+            innerTuple = rightChild.next();
+            if (innerTuple == null) {
+              end = true;
+              break;
+            }
+          } while (tupleComparator[1].compare(previous, innerTuple) == 0);
+          innerIterator = innerTupleSlots.iterator();
+        } catch (CloneNotSupportedException e) {
+
+        }
+      }
+
+      if(!innerIterator.hasNext()){
+        outerNext = outerIterator.next();
+        innerIterator = innerTupleSlots.iterator();
+      }
+
+      frameTuple.set(outerNext, innerIterator.next());
+
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+        projector.eval(frameTuple, outTuple);
+        return outTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    outerTupleSlots.clear();
+    innerTupleSlots.clear();
+    outerIterator = outerTupleSlots.iterator();
+    innerIterator = innerTupleSlots.iterator();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+
+    outerTupleSlots.clear();
+    innerTupleSlots.clear();
+    outerTupleSlots = null;
+    innerTupleSlots = null;
+    outerIterator = null;
+    innerIterator = null;
+    joinQual = null;
+    projector = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
new file mode 100644
index 0000000..6e5900e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class NLJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode plan;
+  private EvalNode joinQual;
+
+
+  // temporal tuples and states for nested loop join
+  private boolean needNewOuter;
+  private FrameTuple frameTuple;
+  private Tuple outerTuple = null;
+  private Tuple innerTuple = null;
+  private Tuple outTuple = null;
+
+  // projection
+  private final Projector projector;
+
+  public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+      PhysicalExec inner) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+    this.plan = plan;
+
+    if (plan.hasJoinQual()) {
+      this.joinQual = plan.getJoinQual();
+    }
+
+    // for projection
+    projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    needNewOuter = true;
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+
+  public Tuple next() throws IOException {
+    for (;;) {
+      if (needNewOuter) {
+        outerTuple = leftChild.next();
+        if (outerTuple == null) {
+          return null;
+        }
+        needNewOuter = false;
+      }
+
+      innerTuple = rightChild.next();
+      if (innerTuple == null) {
+        needNewOuter = true;
+        rightChild.rescan();
+        continue;
+      }
+
+      frameTuple.set(outerTuple, innerTuple);
+      if (joinQual != null) {
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+          projector.eval(frameTuple, outTuple);
+          return outTuple;
+        }
+      } else {
+        projector.eval(frameTuple, outTuple);
+        return outTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    needNewOuter = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
new file mode 100644
index 0000000..5c17c40
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode plan;
+  private EvalNode joinQual;
+
+  // temporal tuples and states for nested loop join
+  private boolean needNextRightTuple;
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+
+  // projection
+  private final Projector projector;
+
+  private boolean foundAtLeastOneMatch;
+  private int rightNumCols;
+
+  public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                             PhysicalExec rightChild) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+
+    if (plan.hasJoinQual()) {
+      this.joinQual = plan.getJoinQual();
+    }
+
+    // for projection
+    projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    needNextRightTuple = true;
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+
+    foundAtLeastOneMatch = false;
+    rightNumCols = rightChild.getSchema().size();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+
+  public Tuple next() throws IOException {
+    for (;;) {
+      if (needNextRightTuple) {
+        leftTuple = leftChild.next();
+        if (leftTuple == null) {
+          return null;
+        }
+        needNextRightTuple = false;
+        // a new tuple from the left child has initially no matches on the right operand
+        foundAtLeastOneMatch = false;
+      }
+      rightTuple = rightChild.next();
+
+      if (rightTuple == null) {
+        // the scan of the right operand is finished with no matches found
+        if(foundAtLeastOneMatch == false){
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(frameTuple, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          foundAtLeastOneMatch = true;
+          needNextRightTuple = true;
+          rightChild.rescan();
+          return outTuple;
+        } else {
+          needNextRightTuple = true;
+          rightChild.rescan();
+          continue;
+        }
+      }
+
+      frameTuple.set(leftTuple, rightTuple);
+      ;
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+        projector.eval(frameTuple, outTuple);
+        foundAtLeastOneMatch = true;
+        return outTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    needNextRightTuple = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
new file mode 100644
index 0000000..7f86ba2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * A Scanner that reads multiple partitions
+ */
+public class PartitionMergeScanExec extends PhysicalExec {
+  private final ScanNode plan;
+  private SeqScanExec currentScanner = null;
+
+  private CatalogProtos.FragmentProto [] fragments;
+
+  private List<SeqScanExec> scanners = Lists.newArrayList();
+  private Iterator<SeqScanExec> iterator;
+
+  private AbstractStorageManager sm;
+
+  private float progress;
+  protected TableStats inputStats;
+
+  public PartitionMergeScanExec(TaskAttemptContext context, AbstractStorageManager sm,
+                                ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+
+    this.plan = plan;
+    this.fragments = fragments;
+    this.sm = sm;
+
+    inputStats = new TableStats();
+  }
+
+  public void init() throws IOException {
+    for (CatalogProtos.FragmentProto fragment : fragments) {
+      scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
+          new CatalogProtos.FragmentProto[] {fragment}));
+    }
+    progress = 0.0f;
+    rescan();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    while (currentScanner != null) {
+      tuple = currentScanner.next();
+
+      if (tuple != null) {
+        return tuple;
+      }
+
+      if (iterator.hasNext()) {
+        if (currentScanner != null) {
+          currentScanner.close();
+        }
+        currentScanner = iterator.next();
+        currentScanner.init();
+      } else {
+        break;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    if (scanners.size() > 0) {
+      iterator = scanners.iterator();
+      currentScanner = iterator.next();
+      currentScanner.init();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (SeqScanExec scanner : scanners) {
+      scanner.close();
+      TableStats scannerTableStsts = scanner.getInputStats();
+      if (scannerTableStsts != null) {
+        inputStats.merge(scannerTableStsts);
+      }
+    }
+    iterator = null;
+    progress = 1.0f;
+  }
+
+  public String getTableName() {
+    return plan.getTableName();
+  }
+
+  @Override
+  public float getProgress() {
+    if (iterator != null) {
+      float progressSum = 0.0f;
+      for (SeqScanExec scanner : scanners) {
+        progressSum += scanner.getProgress();
+      }
+      if (progressSum > 0) {
+        // get a average progress - divide progress summary by the number of scanners
+        return progressSum / (float)(scanners.size());
+      } else {
+        return 0.0f;
+      }
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return inputStats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
new file mode 100644
index 0000000..6d544a7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.storage.Tuple;
+
+public abstract class Partitioner {
+  protected final int [] partitionKeyIds;
+  protected final int numPartitions;
+  
+  public Partitioner(final int [] keyList, final int numPartitions) {
+    Preconditions.checkArgument(keyList != null, 
+        "Partition keys must be given");
+    Preconditions.checkArgument(keyList.length >= 0,
+        "At least one partition key must be specified.");
+    // In outer join, zero can be passed into this value because of empty tables.
+    // So, we should allow zero.
+    Preconditions.checkArgument(numPartitions >= 0,
+        "The number of partitions must be positive: %s", numPartitions);
+    this.partitionKeyIds = keyList;
+    this.numPartitions = numPartitions;    
+  }
+  
+  public abstract int getPartition(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
new file mode 100644
index 0000000..e30a10b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public abstract class PhysicalExec implements SchemaObject {
+  protected final TaskAttemptContext context;
+  protected Schema inSchema;
+  protected Schema outSchema;
+  protected int outColumnNum;
+
+  public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
+                      final Schema outSchema) {
+    this.context = context;
+    this.inSchema = inSchema;
+    this.outSchema = outSchema;
+    this.outColumnNum = outSchema.size();
+  }
+
+  public final Schema getSchema() {
+    return outSchema;
+  }
+
+  public abstract void init() throws IOException;
+
+  public abstract Tuple next() throws IOException;
+
+  public abstract void rescan() throws IOException;
+
+  public abstract void close() throws IOException;
+
+  public abstract float getProgress();
+
+  protected void info(Log log, String message) {
+    log.info("["+ context.getTaskId() + "] " + message);
+  }
+
+  protected void warn(Log log, String message) {
+    log.warn("[" + context.getTaskId() + "] " + message);
+  }
+
+  protected void fatal(Log log, String message) {
+    log.fatal("[" + context.getTaskId() + "] " + message);
+  }
+
+  protected Path getExecutorTmpDir() {
+    return new Path(UUID.randomUUID().toString());
+  }
+
+  public TableStats getInputStats() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
new file mode 100644
index 0000000..738db62
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
+
+  RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashBasedColPartitionStore(CONTEXT context, HashBasedColPartitionStoreExec exec,
+                                         Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitSortBasedColPartitionStore(CONTEXT context, SortBasedColPartitionStoreExec exec,
+                                         Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+
+  RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
new file mode 100644
index 0000000..fdd1839
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class PhysicalPlanUtil {
+  public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
+      throws PhysicalPlanningException {
+    return (T) new FindVisitor().visit(plan, new Stack<PhysicalExec>(), clazz);
+  }
+
+  private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
+    public PhysicalExec visit(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
+        throws PhysicalPlanningException {
+      if (target.isAssignableFrom(exec.getClass())) {
+        return exec;
+      } else {
+        return super.visit(exec, stack, target);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
new file mode 100644
index 0000000..62add1e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.io.IOException;
+
+public class PhysicalPlanningException extends IOException {
+  public PhysicalPlanningException(String message) {
+    super(message);
+  }
+
+  public PhysicalPlanningException(Exception ioe) {
+    super(ioe);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
new file mode 100644
index 0000000..ee6ef1d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.Projectable;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class ProjectionExec extends UnaryPhysicalExec {
+  private Projectable plan;
+
+  // for projection
+  private Tuple outTuple;
+  private Projector projector;
+  
+  public ProjectionExec(TaskAttemptContext context, Projectable plan,
+      PhysicalExec child) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    this.plan = plan;
+  }
+
+  public void init() throws IOException {
+    super.init();
+
+    this.outTuple = new VTuple(outSchema.size());
+    this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple = child.next();
+
+    if (tuple ==  null) {
+      return null;
+    }
+
+    projector.eval(tuple, outTuple);
+    return outTuple;
+  }
+
+  @Override
+  public void close() throws IOException{
+    super.close();
+    plan = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
new file mode 100644
index 0000000..68379d1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * <code>RangeShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle key ranges. The file outputs are stored with index files on local disks.
+ * <code>RangeShuffleFileWriteExec</code> is implemented with an assumption that input tuples are sorted in an
+ * specified order of shuffle keys.
+ */
+public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
+  private static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class);
+  private final SortSpec[] sortSpecs;
+  private int [] indexKeys = null;
+  private Schema keySchema;
+
+  private BSTIndex.BSTIndexWriter indexWriter;
+  private TupleComparator comp;
+  private FileAppender appender;
+  private TableMeta meta;
+
+  public RangeShuffleFileWriteExec(final TaskAttemptContext context, final AbstractStorageManager sm,
+                                   final PhysicalExec child, final Schema inSchema, final Schema outSchema,
+                                   final SortSpec[] sortSpecs) throws IOException {
+    super(context, inSchema, outSchema, child);
+    this.sortSpecs = sortSpecs;
+  }
+
+  public void init() throws IOException {
+    super.init();
+
+    indexKeys = new int[sortSpecs.length];
+    keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
+
+    Column col;
+    for (int i = 0 ; i < sortSpecs.length; i++) {
+      col = sortSpecs[i].getSortKey();
+      indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
+    }
+
+    BSTIndex bst = new BSTIndex(new TajoConf());
+    this.comp = new TupleComparator(keySchema, sortSpecs);
+    Path storeTablePath = new Path(context.getWorkDir(), "output");
+    LOG.info("Output data directory: " + storeTablePath);
+    this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
+        context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
+    FileSystem fs = new RawLocalFileSystem();
+    fs.mkdirs(storeTablePath);
+    this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+        outSchema, new Path(storeTablePath, "output"));
+    this.appender.enableStats();
+    this.appender.init();
+    this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    this.indexWriter.setLoadNum(100);
+    this.indexWriter.open();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+    Tuple prevKeyTuple = null;
+    long offset;
+
+
+    while((tuple = child.next()) != null) {
+      offset = appender.getOffset();
+      appender.addTuple(tuple);
+      keyTuple = new VTuple(keySchema.size());
+      RowStoreUtil.project(tuple, keyTuple, indexKeys);
+      if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
+        indexWriter.write(keyTuple, offset);
+        prevKeyTuple = keyTuple;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+  }
+
+  public void close() throws IOException {
+    super.close();
+
+    appender.flush();
+    IOUtils.cleanup(LOG, appender);
+    indexWriter.flush();
+    IOUtils.cleanup(LOG, indexWriter);
+
+    // Collect statistics data
+    context.setResultStats(appender.getStats());
+    context.addShuffleFileOutput(0, context.getTaskId().toString());
+    appender = null;
+    indexWriter = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
new file mode 100644
index 0000000..c70174a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode joinNode;
+  private EvalNode joinQual;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private Tuple nextLeft = null;
+
+  private List<Tuple> leftTupleSlots;
+  private List<Tuple> innerTupleSlots;
+
+  private JoinTupleComparator joinComparator = null;
+  private TupleComparator[] tupleComparator = null;
+
+  private final static int INITIAL_TUPLE_SLOT = 10000;
+
+  private boolean end = false;
+
+  // projection
+  private Projector projector;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private int posRightTupleSlots = -1;
+  private int posLeftTupleSlots = -1;
+  private boolean endInPopulationStage = false;
+  private boolean initRightDone = false;
+
+  public RightOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+                                 PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+        "but there is no join condition");
+    this.joinNode = plan;
+    this.joinQual = plan.getJoinQual();
+
+    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    SortSpec[][] sortSpecs = new SortSpec[2][];
+    sortSpecs[0] = outerSortKey;
+    sortSpecs[1] = innerSortKey;
+
+    this.joinComparator = new JoinTupleComparator(outer.getSchema(), inner.getSchema(), sortSpecs);
+    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+        plan.getJoinQual(), outer.getSchema(), inner.getSchema());
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+
+    leftNumCols = outer.getSchema().size();
+  }
+
+  public JoinNode getPlan() {
+    return this.joinNode;
+  }
+
+  /**
+   * creates a tuple of a given size filled with NULL values in all fields
+   */
+  private Tuple createNullPaddedTuple(int columnNum){
+    VTuple tuple = new VTuple(columnNum);
+    for (int i = 0; i < columnNum; i++) {
+      tuple.put(i, DatumFactory.createNullDatum());
+    }
+    return tuple;
+  }
+
+  /**
+   *
+   * Right outer merge join consists of four stages
+   * <ul>
+   *   <li>initialization stage: </li>
+   *   <li>finalizing stage: </li>
+   * </ul>
+   *
+   * @return
+   * @throws IOException
+   */
+  public Tuple next() throws IOException {
+    Tuple previous;
+
+    for (;;) {
+      boolean newRound = false;
+      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+        newRound = true;
+      }
+      if ((posRightTupleSlots == innerTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+        newRound = true;
+      }
+
+      if (newRound) {
+
+        //////////////////////////////////////////////////////////////////////
+        // BEGIN FINALIZING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // The finalizing stage, where remaining tuples on the only right are transformed into left-padded results
+        if (end) {
+          if (initRightDone == false) {
+            // maybe the left operand was empty => the right one didn't have the chance to initialize
+            rightTuple = rightChild.next();
+            initRightDone = true;
+          }
+
+          if(rightTuple == null) {
+            return null;
+          } else {
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(frameTuple, outTuple);
+
+            // we simulate we found a match, which is exactly the null padded one
+            rightTuple = rightChild.next();
+
+            return outTuple;
+          }
+        }
+        //////////////////////////////////////////////////////////////////////
+        // END FINALIZING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+
+        //////////////////////////////////////////////////////////////////////
+        // BEGIN INITIALIZATION STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // This stage reads the first tuple on each side
+        if (leftTuple == null) {
+          leftTuple = leftChild.next();
+
+          if (leftTuple == null) {
+            end = true;
+            continue;
+          }
+        }
+
+        if(rightTuple == null){
+          rightTuple = rightChild.next();
+
+          if(rightTuple != null){
+            initRightDone = true;
+          }
+          else {
+            initRightDone = true;
+            end = true;
+            continue;
+          }
+        }
+        //////////////////////////////////////////////////////////////////////
+        // END INITIALIZATION STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // reset tuple slots for a new round
+        leftTupleSlots.clear();
+        innerTupleSlots.clear();
+        posRightTupleSlots = -1;
+        posLeftTupleSlots = -1;
+
+
+        //////////////////////////////////////////////////////////////////////
+        // BEGIN MOVE FORWARDING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // This stage moves forward a tuple cursor on each side relation until a match
+        // is found
+        int cmp;
+        while ((end != true) && ((cmp = joinComparator.compare(leftTuple, rightTuple)) != 0)) {
+
+          // if right is lower than the left tuple, it means that all right tuples s.t. cmp <= 0 are
+          // matched tuples.
+          if (cmp > 0) {
+            // before getting a new tuple from the right,  a left null padded tuple should be built
+            // output a tuple with the nulls padded left tuple
+            Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(frameTuple, outTuple);
+
+            // we simulate we found a match, which is exactly the null padded one
+            // BEFORE RETURN, MOVE FORWARD
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              end = true;
+            }
+            return outTuple;
+
+          } else if (cmp < 0) {
+            // If the left tuple is lower than the right tuple, just move forward the left tuple cursor.
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              end = true;
+              // in original algorithm we had return null ,
+              // but now we need to continue the end processing phase for remaining unprocessed right tuples
+            }
+          } // if (cmp<0)
+        } // while
+        //////////////////////////////////////////////////////////////////////
+        // END MOVE FORWARDING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // once a match is found, retain all tuples with this key in tuple slots on each side
+        if(!end) {
+          endInPopulationStage = false;
+
+          boolean endOuter = false;
+          boolean endInner = false;
+
+          previous = new VTuple(leftTuple);
+          do {
+            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTuple = leftChild.next();
+            if( leftTuple == null) {
+              endOuter = true;
+            }
+          } while ((endOuter != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          posLeftTupleSlots = 0;
+
+          previous = new VTuple(rightTuple);
+
+          do {
+            innerTupleSlots.add(new VTuple(rightTuple));
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              endInner = true;
+            }
+
+          } while ((endInner != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          posRightTupleSlots = 0;
+
+          if ((endOuter == true) || (endInner == true)) {
+            end = true;
+            endInPopulationStage = true;
+          }
+        } // if end false
+      } // if newRound
+
+
+      // Now output result matching tuples from the slots
+      // if either we haven't reached end on neither side, or we did reach end on one(or both) sides
+      // but that happened in the slots population step (i.e. refers to next round)
+
+      if ((end == false) || ((end == true) && (endInPopulationStage == true))){
+
+        if(posLeftTupleSlots == 0){
+          nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          posLeftTupleSlots = posLeftTupleSlots + 1;
+        }
+
+
+        if(posRightTupleSlots <= (innerTupleSlots.size() -1)) {
+
+          Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+          posRightTupleSlots = posRightTupleSlots + 1;
+
+          frameTuple.set(nextLeft, aTuple);
+          joinQual.eval(inSchema, frameTuple);
+          projector.eval(frameTuple, outTuple);
+          return outTuple;
+
+        } else {
+          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+          if(posLeftTupleSlots <= (leftTupleSlots.size() - 1)) {
+            //rewind the right slots position
+            posRightTupleSlots = 0;
+            Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+            posRightTupleSlots = posRightTupleSlots + 1;
+            nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            posLeftTupleSlots = posLeftTupleSlots + 1;
+
+            frameTuple.set(nextLeft, aTuple);
+            joinQual.eval(inSchema, frameTuple);
+            projector.eval(frameTuple, outTuple);
+            return outTuple;
+          }
+        }
+      } // the second if end false
+    } // for
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    leftTupleSlots.clear();
+    innerTupleSlots.clear();
+    posRightTupleSlots = -1;
+    posLeftTupleSlots = -1;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    leftTupleSlots.clear();
+    innerTupleSlots.clear();
+    leftTupleSlots = null;
+    innerTupleSlots = null;
+    joinNode = null;
+    joinQual = null;
+    projector = null;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
new file mode 100644
index 0000000..2e676e9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.SelectionNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class SelectionExec extends UnaryPhysicalExec  {
+  private final EvalNode qual;
+
+  public SelectionExec(TaskAttemptContext context,
+                       SelectionNode plan,
+                       PhysicalExec child) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    this.qual = plan.getQual();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    while ((tuple = child.next()) != null) {
+      if (qual.eval(inSchema, tuple).isTrue()) {
+        return tuple;
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
new file mode 100644
index 0000000..6dbcc3f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.utils.*;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class SeqScanExec extends PhysicalExec {
+  private ScanNode plan;
+
+  private Scanner scanner = null;
+
+  private EvalNode qual = null;
+
+  private CatalogProtos.FragmentProto [] fragments;
+
+  private Projector projector;
+
+  private TableStats inputStats;
+
+  private TupleCacheKey cacheKey;
+
+  private boolean cacheRead = false;
+
+  public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
+                     ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+
+    this.plan = plan;
+    this.qual = plan.getQual();
+    this.fragments = fragments;
+
+    if (plan.isBroadcastTable()) {
+      cacheKey = new TupleCacheKey(
+          context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName());
+    }
+  }
+
+  /**
+   * This method rewrites an input schema of column-partitioned table because
+   * there are no actual field values in data file in a column-partitioned table.
+   * So, this method removes partition key columns from the input schema.
+   *
+   * TODO - This implementation assumes that a fragment is always FileFragment.
+   * In the column partitioned table, a path has an important role to
+   * indicate partition keys. In this time, it is right. Later, we have to fix it.
+   */
+  private void rewriteColumnPartitionedTableSchema() throws IOException {
+    PartitionMethodDesc partitionDesc = plan.getTableDesc().getPartitionMethod();
+    Schema columnPartitionSchema = SchemaUtil.clone(partitionDesc.getExpressionSchema());
+    String qualifier = inSchema.getColumn(0).getQualifier();
+    columnPartitionSchema.setQualifier(qualifier);
+
+    // Remove partition key columns from an input schema.
+    this.inSchema = plan.getTableDesc().getSchema();
+
+    List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+
+    // Get a partition key value from a given path
+    Tuple partitionRow =
+        TupleUtil.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), false);
+
+    // Targets or search conditions may contain column references.
+    // However, actual values absent in tuples. So, Replace all column references by constant datum.
+    for (Column column : columnPartitionSchema.toArray()) {
+      FieldEval targetExpr = new FieldEval(column);
+      Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
+      ConstEval constExpr = new ConstEval(datum);
+
+      for (Target target : plan.getTargets()) {
+        if (target.getEvalTree().equals(targetExpr)) {
+          if (!target.hasAlias()) {
+            target.setAlias(target.getEvalTree().getName());
+          }
+          target.setExpr(constExpr);
+        } else {
+          EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
+        }
+      }
+
+      if (plan.hasQual()) {
+        EvalTreeUtil.replace(plan.getQual(), targetExpr, constExpr);
+      }
+    }
+  }
+
+  public void init() throws IOException {
+    Schema projected;
+
+    if (fragments != null
+        && plan.getTableDesc().hasPartition()
+        && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
+      rewriteColumnPartitionedTableSchema();
+    }
+
+    if (plan.hasTargets()) {
+      projected = new Schema();
+      Set<Column> columnSet = new HashSet<Column>();
+
+      if (plan.hasQual()) {
+        columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
+      }
+
+      for (Target t : plan.getTargets()) {
+        columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
+      }
+
+      for (Column column : inSchema.getColumns()) {
+        if (columnSet.contains(column)) {
+          projected.addColumn(column);
+        }
+      }
+    } else {
+      projected = outSchema;
+    }
+
+    if (cacheKey != null) {
+      TupleCache tupleCache = TupleCache.getInstance();
+      if (tupleCache.isBroadcastCacheReady(cacheKey)) {
+        openCacheScanner();
+      } else {
+        if (TupleCache.getInstance().lockBroadcastScan(cacheKey)) {
+          scanAndAddCache(projected);
+          openCacheScanner();
+        } else {
+          Object lockMonitor = tupleCache.getLockMonitor();
+          synchronized (lockMonitor) {
+            try {
+              lockMonitor.wait(20 * 1000);
+            } catch (InterruptedException e) {
+            }
+          }
+          if (tupleCache.isBroadcastCacheReady(cacheKey)) {
+            openCacheScanner();
+          } else {
+            initScanner(projected);
+          }
+        }
+      }
+    } else {
+      initScanner(projected);
+    }
+  }
+
+  private void initScanner(Schema projected) throws IOException {
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    if (fragments != null) {
+      if (fragments.length > 1) {
+        this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),
+            FragmentConvertor.<FileFragment>convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(),
+                fragments), projected
+        );
+      } else {
+        this.scanner = StorageManagerFactory.getStorageManager(
+            context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragments[0],
+            projected);
+      }
+      scanner.init();
+    }
+  }
+
+  private void openCacheScanner() throws IOException {
+    Scanner cacheScanner = TupleCache.getInstance().openCacheScanner(cacheKey, plan.getPhysicalSchema());
+    if (cacheScanner != null) {
+      scanner = cacheScanner;
+      cacheRead = true;
+    }
+  }
+
+  private void scanAndAddCache(Schema projected) throws IOException {
+    initScanner(projected);
+
+    List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>();
+    while (true) {
+      Tuple tuple = next();
+      if (tuple != null) {
+        broadcastTupleCacheList.add(tuple);
+      } else {
+        break;
+      }
+    }
+
+    scanner.close();
+    scanner = null;
+
+    TupleCache.getInstance().addBroadcastCache(cacheKey, broadcastTupleCacheList);
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (fragments == null) {
+      return null;
+    }
+
+    Tuple tuple;
+    Tuple outTuple = new VTuple(outColumnNum);
+
+    if (!plan.hasQual()) {
+      if ((tuple = scanner.next()) != null) {
+        if (cacheRead) {
+          return tuple;
+        }
+        projector.eval(tuple, outTuple);
+        outTuple.setOffset(tuple.getOffset());
+        return outTuple;
+      } else {
+        return null;
+      }
+    } else {
+      while ((tuple = scanner.next()) != null) {
+        if (cacheRead) {
+          return tuple;
+        }
+        if (qual.eval(inSchema, tuple).isTrue()) {
+          projector.eval(tuple, outTuple);
+          return outTuple;
+        }
+      }
+      return null;
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    scanner.reset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.cleanup(null, scanner);
+    if (scanner != null) {
+      try {
+        TableStats stat = scanner.getInputStats();
+        if (stat != null) {
+          inputStats = (TableStats)(stat.clone());
+        }
+      } catch (CloneNotSupportedException e) {
+        e.printStackTrace();
+      }
+    }
+    scanner = null;
+    plan = null;
+    qual = null;
+    projector = null;
+  }
+
+  public String getTableName() {
+    return plan.getTableName();
+  }
+
+  @Override
+  public float getProgress() {
+    if (scanner == null) {
+      return 1.0f;
+    } else {
+      return scanner.getProgress();
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (scanner != null) {
+      return scanner.getInputStats();
+    } else {
+      return inputStats;
+    }
+  }
+
+  @Override
+  public String toString() {
+    if (scanner != null) {
+      return "SeqScanExec:" + plan.getTableName() + "," + scanner.getClass().getName();
+    } else {
+      return "SeqScanExec:" + plan.getTableName();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
new file mode 100644
index 0000000..629889d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * This is the sort-based aggregation operator.
+ *
+ * <h3>Implementation</h3>
+ * Sort Aggregation has two states while running.
+ *
+ * <h4>Aggregate state</h4>
+ * If lastkey is null or lastkey is equivalent to the current key, sort aggregation is changed to this state.
+ * In this state, this operator aggregates measure values via aggregation functions.
+ *
+ * <h4>Finalize state</h4>
+ * If currentKey is different from the last key, it computes final aggregation results, and then
+ * it makes an output tuple.
+ */
+public class SortAggregateExec extends AggregationExec {
+  private Tuple lastKey = null;
+  private boolean finished = false;
+  private FunctionContext contexts[];
+
+  public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException {
+    super(context, plan, child);
+    contexts = new FunctionContext[plan.getAggFunctions().length];
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple currentKey;
+    Tuple tuple;
+    Tuple outputTuple = null;
+
+    while(!context.isStopped() && (tuple = child.next()) != null) {
+
+      // get a key tuple
+      currentKey = new VTuple(groupingKeyIds.length);
+      for(int i = 0; i < groupingKeyIds.length; i++) {
+        currentKey.put(i, tuple.get(groupingKeyIds[i]));
+      }
+
+      /** Aggregation State */
+      if (lastKey == null || lastKey.equals(currentKey)) {
+        if (lastKey == null) {
+          for(int i = 0; i < aggFunctionsNum; i++) {
+            contexts[i] = aggFunctions[i].newContext();
+            aggFunctions[i].merge(contexts[i], inSchema, tuple);
+          }
+          lastKey = currentKey;
+        } else {
+          // aggregate
+          for (int i = 0; i < aggFunctionsNum; i++) {
+            aggFunctions[i].merge(contexts[i], inSchema, tuple);
+          }
+        }
+
+      } else { /** Finalization State */
+        // finalize aggregate and return
+        outputTuple = new VTuple(outSchema.size());
+        int tupleIdx = 0;
+
+        for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+          outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+        }
+        for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+          outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
+        }
+
+        for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) {
+          contexts[evalIdx] = aggFunctions[evalIdx].newContext();
+          aggFunctions[evalIdx].merge(contexts[evalIdx], inSchema, tuple);
+        }
+
+        lastKey = currentKey;
+        return outputTuple;
+      }
+    } // while loop
+
+    if (!finished) {
+      outputTuple = new VTuple(outSchema.size());
+      int tupleIdx = 0;
+      for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+        outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+      }
+      for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+        outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
+      }
+      finished = true;
+    }
+    return outputTuple;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    lastKey = null;
+    finished = false;
+  }
+}