You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/07 10:26:46 UTC

[1/2] TAJO-232: Rename join operators and add other join operators to PhysicalPlanner. (hyunsik)

Updated Branches:
  refs/heads/master 1023c8269 -> d7645252f


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
new file mode 100644
index 0000000..665a3e3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+/**
+ * Prepare a hash table of the NOT IN side of the join. Scan the FROM side table.
+ * For each tuple of the FROM side table, it tries to find a matched tuple from the hash table for the NOT INT side.
+ * If not found, it returns the tuple of the FROM side table with null padding.
+ */
+public class HashLeftAntiJoinExec extends HashJoinExec {
+  private Tuple rightNullTuple;
+
+  public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
+                              PhysicalExec notInSideChild) {
+    super(context, plan, fromSideChild, notInSideChild);
+    // NUll Tuple
+    rightNullTuple = new VTuple(leftChild.outColumnNum);
+    for (int i = 0; i < leftChild.outColumnNum; i++) {
+      rightNullTuple.put(i, NullDatum.get());
+    }
+  }
+
+  /**
+   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+   * next() method finds the first unmatched tuple from both tables.
+   *
+   * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
+   * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
+   * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
+   *
+   * @return The tuple which is unmatched to a given join condition.
+   * @throws IOException
+   */
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean notFound;
+
+    while(!finished) {
+
+      // getting new outer
+      leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = true;
+        return null;
+      }
+
+      // Try to find a hash bucket in in-memory hash table
+      getKeyLeftTuple(leftTuple, leftKeyTuple);
+      if (tupleSlots.containsKey(leftKeyTuple)) {
+        // if found, it gets a hash bucket from the hash table.
+        iterator = tupleSlots.get(leftKeyTuple).iterator();
+      } else {
+        // if not found, it returns a tuple.
+        frameTuple.set(leftTuple, rightNullTuple);
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        return outTuple;
+      }
+
+      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+      // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
+      notFound = true;
+      while (notFound && iterator.hasNext()) {
+        rightTuple = iterator.next();
+        frameTuple.set(leftTuple, rightTuple);
+        joinQual.eval(qualCtx, inSchema, frameTuple);
+        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+          notFound = false;
+        }
+      }
+
+      if (notFound) { // if there is no matched tuple
+        frameTuple.set(leftTuple, rightNullTuple);
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
new file mode 100644
index 0000000..cc12690
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
+
+  public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                               PhysicalExec rightChild) {
+    super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
+        plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = joinQual.newContext();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = leftChild.getSchema().getColumnNum();
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          finished = true;
+          return null;
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      joinQual.eval(qualCtx, inSchema, frameTuple);
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        found = true;
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      List<Tuple> newValue;
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      if (tupleSlots.containsKey(keyTuple)) {
+        newValue = tupleSlots.get(keyTuple);
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  public void close() throws IOException {
+    tupleSlots.clear();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
new file mode 100644
index 0000000..8c8a6b1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+/**
+ * Prepare a hash table of the NOT IN side of the join. Scan the FROM side table.
+ * For each tuple of the FROM side table, it tries to find a matched tuple from the hash table for the NOT INT side.
+ * If found, it returns the tuple of the FROM side table.
+ */
+public class HashLeftSemiJoinExec extends HashJoinExec {
+  private Tuple rightNullTuple;
+
+  public HashLeftSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
+                              PhysicalExec inSideChild) {
+    super(context, plan, fromSideChild, inSideChild);
+    // NUll Tuple
+    rightNullTuple = new VTuple(leftChild.outColumnNum);
+    for (int i = 0; i < leftChild.outColumnNum; i++) {
+      rightNullTuple.put(i, NullDatum.get());
+    }
+  }
+
+  /**
+   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+   * next() method finds the first unmatched tuple from both tables.
+   *
+   * For each left tuple on the disk, next() tries to find at least one matched tuple from the hash table.
+   *
+   * In more detail, until there is a hash bucket matched to the left tuple in the hash table, it continues to traverse
+   * the left tuples. If next() finds the matched bucket in the hash table, it finds any matched tuple in the bucket.
+   * If found, it returns the composite tuple immediately without finding more matched tuple in the bucket.
+   *
+   * @return The tuple which is firstly matched to a given join condition.
+   * @throws java.io.IOException
+   */
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean notFound;
+
+    while(!finished) {
+
+      // getting new outer
+      leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = true;
+        return null;
+      }
+
+      // Try to find a hash bucket in in-memory hash table
+      getKeyLeftTuple(leftTuple, leftKeyTuple);
+      if (tupleSlots.containsKey(leftKeyTuple)) {
+        // if found, it gets a hash bucket from the hash table.
+        iterator = tupleSlots.get(leftKeyTuple).iterator();
+      } else {
+        continue;
+      }
+
+      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+      // If it finds any matched tuple, it returns the tuple immediately.
+      notFound = true;
+      while (notFound && iterator.hasNext()) {
+        rightTuple = iterator.next();
+        frameTuple.set(leftTuple, rightTuple);
+        joinQual.eval(qualCtx, inSchema, frameTuple);
+        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+          notFound = false;
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+        }
+      }
+
+      if (!notFound) { // if there is no matched tuple
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
deleted file mode 100644
index 36b84a8..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-
-public class HashSemiJoinExec extends HashJoinExec {
-  private Tuple rightNullTuple;
-
-  public HashSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
-                          PhysicalExec right) {
-    super(context, plan, left, right);
-    // NUll Tuple
-    rightNullTuple = new VTuple(leftChild.outColumnNum);
-    for (int i = 0; i < leftChild.outColumnNum; i++) {
-      rightNullTuple.put(i, NullDatum.get());
-    }
-  }
-
-  /**
-   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
-   * next() method finds the first unmatched tuple from both tables.
-   *
-   * For each left tuple on the disk, next() tries to find the right tuple from the hash table.
-   * Until there is a hash bucket matched to the left tuple in the hash table, it continues to traverse the left
-   * tuples. If next() finds the matched bucket in the hash table, it finds any matched tuple in the bucket.
-   * If found, it returns the composite tuple immediately without finding more matched tuple in the bucket.
-   *
-   * @return The tuple which is firstly matched to a given join condition.
-   * @throws java.io.IOException
-   */
-  public Tuple next() throws IOException {
-    if (first) {
-      loadRightToHashTable();
-    }
-
-    Tuple rightTuple;
-    boolean notFound;
-
-    while(!finished) {
-
-      // getting new outer
-      leftTuple = leftChild.next(); // it comes from a disk
-      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-        finished = true;
-        return null;
-      }
-
-      // Try to find a hash bucket in in-memory hash table
-      getKeyLeftTuple(leftTuple, leftKeyTuple);
-      if (tupleSlots.containsKey(leftKeyTuple)) {
-        // if found, it gets a hash bucket from the hash table.
-        iterator = tupleSlots.get(leftKeyTuple).iterator();
-      } else {
-        continue;
-      }
-
-      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
-      // If it finds any matched tuple, it returns the tuple immediately.
-      notFound = true;
-      while (notFound && iterator.hasNext()) {
-        rightTuple = iterator.next();
-        frameTuple.set(leftTuple, rightTuple);
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
-          notFound = false;
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
-        }
-      }
-
-      if (!notFound) { // if there is no matched tuple
-        break;
-      }
-    }
-
-    return outTuple;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
deleted file mode 100644
index bbf39dd..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-import java.util.*;
-import org.apache.tajo.datum.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-
-public class LeftOuterHashJoinExec extends BinaryPhysicalExec {
-  // from logical plan
-  protected JoinNode plan;
-  protected EvalNode joinQual;
-
-  protected List<Column[]> joinKeyPairs;
-
-  // temporal tuples and states for nested loop join
-  protected boolean first = true;
-  protected FrameTuple frameTuple;
-  protected Tuple outTuple = null;
-  protected Map<Tuple, List<Tuple>> tupleSlots;
-  protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
-  protected Tuple leftTuple;
-  protected Tuple leftKeyTuple;
-
-  protected int [] leftKeyList;
-  protected int [] rightKeyList;
-
-  protected boolean finished = false;
-  protected boolean shouldGetLeftTuple = true;
-
-  // projection
-  protected final Projector projector;
-  protected final EvalContext [] evalContexts;
-
-  private int rightNumCols;
-  private int leftNumCols;
-  private static final Log LOG = LogFactory.getLog(LeftOuterHashJoinExec.class);
-
-  public LeftOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
-                               PhysicalExec rightChild) {
-    super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
-        plan.getOutSchema(), leftChild, rightChild);
-    this.plan = plan;
-    this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
-    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
-    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
-
-    leftKeyList = new int[joinKeyPairs.size()];
-    rightKeyList = new int[joinKeyPairs.size()];
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
-    }
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
-    }
-
-    // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.getColumnNum());
-    leftKeyTuple = new VTuple(leftKeyList.length);
-
-    leftNumCols = leftChild.getSchema().getColumnNum();
-    rightNumCols = rightChild.getSchema().getColumnNum();
-  }
-
-  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
-    for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
-    }
-  }
-
-  public Tuple next() throws IOException {
-    if (first) {
-      loadRightToHashTable();
-    }
-
-    Tuple rightTuple;
-    boolean found = false;
-
-    while(!finished) {
-
-      if (shouldGetLeftTuple) { // initially, it is true.
-        // getting new outer
-        leftTuple = leftChild.next(); // it comes from a disk
-        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-          finished = true;
-          return null;
-        }
-
-        // getting corresponding right
-        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
-        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
-          iterator = tupleSlots.get(leftKeyTuple).iterator();
-          shouldGetLeftTuple = false;
-        } else {
-          // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
-          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-          frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
-          // we simulate we found a match, which is exactly the null padded one
-          shouldGetLeftTuple = true;
-          return outTuple;
-        }
-      }
-
-      // getting a next right tuple on in-memory hash table.
-      rightTuple = iterator.next();
-      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        found = true;
-      }
-
-      if (!iterator.hasNext()) { // no more right tuples for this hash key
-        shouldGetLeftTuple = true;
-      }
-
-      if (found) {
-        break;
-      }
-    }
-
-    return outTuple;
-  }
-
-  protected void loadRightToHashTable() throws IOException {
-    Tuple tuple;
-    Tuple keyTuple;
-
-    while ((tuple = rightChild.next()) != null) {
-      keyTuple = new VTuple(joinKeyPairs.size());
-      List<Tuple> newValue;
-      for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(rightKeyList[i]));
-      }
-
-      if (tupleSlots.containsKey(keyTuple)) {
-        newValue = tupleSlots.get(keyTuple);
-        newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
-      } else {
-        newValue = new ArrayList<Tuple>();
-        newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
-      }
-    }
-    first = false;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-
-    tupleSlots.clear();
-    first = true;
-
-    finished = false;
-    iterator = null;
-    shouldGetLeftTuple = true;
-  }
-
-  public void close() throws IOException {
-    tupleSlots.clear();
-  }
-
-  public JoinNode getPlan() {
-    return this.plan;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
deleted file mode 100644
index cc7b331..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-
-public class LeftOuterNLJoinExec extends BinaryPhysicalExec {
-  // from logical plan
-  private JoinNode plan;
-  private EvalNode joinQual;
-
-  // temporal tuples and states for nested loop join
-  private boolean needNextRightTuple;
-  private FrameTuple frameTuple;
-  private Tuple leftTuple = null;
-  private Tuple rightTuple = null;
-  private Tuple outTuple = null;
-  private EvalContext qualCtx;
-
-  // projection
-  private final EvalContext [] evalContexts;
-  private final Projector projector;
-
-  private boolean foundAtLeastOneMatch;
-  private int rightNumCols;
-
-  public LeftOuterNLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
-                             PhysicalExec rightChild) {
-    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
-    this.plan = plan;
-
-    if (plan.hasJoinQual()) {
-      this.joinQual = plan.getJoinQual();
-      this.qualCtx = this.joinQual.newContext();
-    }
-
-    // for projection
-    projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.renew();
-
-    // for join
-    needNextRightTuple = true;
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.getColumnNum());
-
-    foundAtLeastOneMatch = false;
-    rightNumCols = rightChild.getSchema().getColumnNum();
-  }
-
-  public JoinNode getPlan() {
-    return this.plan;
-  }
-
-  public Tuple next() throws IOException {
-    for (;;) {
-      if (needNextRightTuple) {
-        leftTuple = leftChild.next();
-        if (leftTuple == null) {
-          return null;
-        }
-        needNextRightTuple = false;
-        // a new tuple from the left child has initially no matches on the right operand
-        foundAtLeastOneMatch = false;
-      }
-      rightTuple = rightChild.next();
-
-      if (rightTuple == null) {
-        // the scan of the right operand is finished with no matches found
-        if(foundAtLeastOneMatch == false){
-          //output a tuple with the nulls padded rightTuple
-          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-          frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
-          // we simulate we found a match, which is exactly the null padded one
-          foundAtLeastOneMatch = true;
-          needNextRightTuple = true;
-          rightChild.rescan();
-          return outTuple;
-        } else {
-          needNextRightTuple = true;
-          rightChild.rescan();
-          continue;
-        }
-      }
-
-      frameTuple.set(leftTuple, rightTuple);
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).asBool()) {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        foundAtLeastOneMatch = true;
-        return outTuple;
-      }
-    }
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-    needNextRightTuple = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
new file mode 100644
index 0000000..e3a861e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode joinNode;
+  private EvalNode joinQual;
+  private EvalContext qualCtx;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private Tuple leftNext = null;
+
+  private final List<Tuple> leftTupleSlots;
+  private final List<Tuple> rightTupleSlots;
+
+  private JoinTupleComparator joincomparator = null;
+  private TupleComparator[] tupleComparator = null;
+
+  private final static int INITIAL_TUPLE_SLOT = 10000;
+
+  private boolean end = false;
+
+  // projection
+  private final Projector projector;
+  private final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private int posRightTupleSlots = -1;
+  private int posLeftTupleSlots = -1;
+  boolean endInPopulationStage = false;
+  private boolean initRightDone = false;
+
+  public MergeFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                                PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+        "but there is no join condition");
+    this.joinNode = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = this.joinQual.newContext();
+
+    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    SortSpec[][] sortSpecs = new SortSpec[2][];
+    sortSpecs[0] = leftSortKey;
+    sortSpecs[1] = rightSortKey;
+
+    this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
+        rightChild.getSchema(), sortSpecs);
+    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+        plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+
+    leftNumCols = leftChild.getSchema().getColumnNum();
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  public JoinNode getPlan(){
+    return this.joinNode;
+  }
+
+  public Tuple next() throws IOException {
+    Tuple previous;
+
+    for (;;) {
+      boolean newRound = false;
+      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+        newRound = true;
+      }
+      if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+        newRound = true;
+      }
+
+      if(newRound == true){
+
+        if (end) {
+
+          ////////////////////////////////////////////////////////////////////////
+          // FINALIZING STAGE
+          ////////////////////////////////////////////////////////////////////////
+          // the finalizing stage, where remaining tuples on the right are
+          // transformed into left-padded results while tuples on the left
+          // are transformed into right-padded results
+
+          // before exit, a left-padded tuple should be built for all remaining
+          // right side and a right-padded tuple should be built for all remaining
+          // left side
+
+          if (initRightDone == false) {
+            // maybe the left operand was empty => the right one didn't have the chance to initialize
+            rightTuple = rightChild.next();
+            initRightDone = true;
+          }
+
+          if((leftTuple == null) && (rightTuple == null)) {
+            return null;
+          }
+
+          if((leftTuple == null) && (rightTuple != null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            rightTuple = rightChild.next();
+            return outTuple;
+          }
+
+          if((leftTuple != null) && (rightTuple == null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            leftTuple = leftChild.next();
+            return outTuple;
+          }
+        } // if end
+
+        ////////////////////////////////////////////////////////////////////////
+        // INITIALIZING STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // initializing stage, reading the first tuple on each side
+        if (leftTuple == null) {
+          leftTuple = leftChild.next();
+          if( leftTuple == null){
+            end = true;
+            continue;
+          }
+        }
+        if (rightTuple == null) {
+          rightTuple = rightChild.next();
+          initRightDone = true;
+          if (rightTuple == null) {
+            end = true;
+            continue;
+          }
+        }
+
+        // reset tuple slots for a new round
+        leftTupleSlots.clear();
+        rightTupleSlots.clear();
+        posRightTupleSlots = -1;
+        posLeftTupleSlots = -1;
+
+        ////////////////////////////////////////////////////////////////////////
+        // Comparison and Move Forward Stage
+        ////////////////////////////////////////////////////////////////////////
+        // advance alternatively on each side until a match is found
+        int cmp;
+        while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
+
+          if (cmp > 0) {
+
+            //before getting a new tuple from the right,  a leftnullpadded tuple should be built
+            //output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // BEFORE RETURN, MOVE FORWARD
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } else if (cmp < 0) {
+            // before getting a new tuple from the left,  a rightnullpadded tuple should be built
+            // output a tuple with the nulls padded rightTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            // BEFORE RETURN, MOVE FORWARD
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } // if (cmp < 0)
+        } //while
+
+
+        ////////////////////////////////////////////////////////////////////////
+        // SLOTS POPULATION STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // once a match is found, retain all tuples with this key in tuple slots
+        // on each side
+        if(!end) {
+          endInPopulationStage = false;
+
+          boolean endLeft = false;
+          boolean endRight = false;
+
+          previous = new VTuple(leftTuple);
+          do {
+            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              endLeft = true;
+            }
+
+
+          } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          posLeftTupleSlots = 0;
+
+
+          previous = new VTuple(rightTuple);
+          do {
+            rightTupleSlots.add(new VTuple(rightTuple));
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              endRight = true;
+            }
+
+          } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          posRightTupleSlots = 0;
+
+          if ((endLeft == true) || (endRight == true)) {
+            end = true;
+            endInPopulationStage = true;
+          }
+
+        } // if end false
+      } // if newRound
+
+
+      ////////////////////////////////////////////////////////////////////////
+      // RESULTS STAGE
+      ////////////////////////////////////////////////////////////////////////
+      // now output result matching tuples from the slots
+      // if either we haven't reached end on neither side, or we did reach end
+      // on one(or both) sides but that happened in the slots population step
+      // (i.e. refers to next round)
+      if(!end || (end && endInPopulationStage)){
+        if(posLeftTupleSlots == 0){
+          leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          posLeftTupleSlots = posLeftTupleSlots + 1;
+        }
+
+        if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
+          Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+          posRightTupleSlots = posRightTupleSlots + 1;
+          frameTuple.set(leftNext, aTuple);
+          joinQual.eval(qualCtx, inSchema, frameTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          return outTuple;
+        } else {
+          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+          if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
+            //rewind the right slots position
+            posRightTupleSlots = 0;
+            Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+            posRightTupleSlots = posRightTupleSlots + 1;
+            leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            posLeftTupleSlots = posLeftTupleSlots + 1;
+
+            frameTuple.set(leftNext, aTuple);
+            joinQual.eval(qualCtx, inSchema, frameTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            return outTuple;
+          }
+        }
+      } // the second if end false
+    } // for
+  }
+
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    leftTupleSlots.clear();
+    rightTupleSlots.clear();
+    posRightTupleSlots = -1;
+    posLeftTupleSlots = -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
new file mode 100644
index 0000000..d28dd12
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode plan;
+  private EvalNode joinQual;
+
+  // temporal tuples and states for nested loop join
+  private boolean needNextRightTuple;
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private EvalContext qualCtx;
+
+  // projection
+  private final EvalContext [] evalContexts;
+  private final Projector projector;
+
+  private boolean foundAtLeastOneMatch;
+  private int rightNumCols;
+
+  public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                             PhysicalExec rightChild) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+
+    if (plan.hasJoinQual()) {
+      this.joinQual = plan.getJoinQual();
+      this.qualCtx = this.joinQual.newContext();
+    }
+
+    // for projection
+    projector = new Projector(inSchema, outSchema, plan.getTargets());
+    evalContexts = projector.renew();
+
+    // for join
+    needNextRightTuple = true;
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+
+    foundAtLeastOneMatch = false;
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+
+  public Tuple next() throws IOException {
+    for (;;) {
+      if (needNextRightTuple) {
+        leftTuple = leftChild.next();
+        if (leftTuple == null) {
+          return null;
+        }
+        needNextRightTuple = false;
+        // a new tuple from the left child has initially no matches on the right operand
+        foundAtLeastOneMatch = false;
+      }
+      rightTuple = rightChild.next();
+
+      if (rightTuple == null) {
+        // the scan of the right operand is finished with no matches found
+        if(foundAtLeastOneMatch == false){
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          foundAtLeastOneMatch = true;
+          needNextRightTuple = true;
+          rightChild.rescan();
+          return outTuple;
+        } else {
+          needNextRightTuple = true;
+          rightChild.rescan();
+          continue;
+        }
+      }
+
+      frameTuple.set(leftTuple, rightTuple);
+      joinQual.eval(qualCtx, inSchema, frameTuple);
+      if (joinQual.terminate(qualCtx).asBool()) {
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        foundAtLeastOneMatch = true;
+        return outTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    needNextRightTuple = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index 131fbe5..f0abef5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.planner.physical.*;
-
 import java.util.Stack;
 
 public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
@@ -31,7 +29,7 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
   RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
   RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
   RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
-  RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
   RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index a6fb17a..8c3c92b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -252,6 +252,30 @@ public class TestSQLAnalyzer {
   }
 
   @Test
+  public void testInSubquery1() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/in_subquery_1.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testInSubquery2() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/in_subquery_2.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testExistsPredicate1() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/exists_predicate_1.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testExistsPredicate2() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/exists_predicate_2.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
   public void testInsertIntoTable() throws IOException {
     String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_1.sql"));
     parseQuery(sql);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index 6fb7f61..fa8aa36 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -35,7 +35,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -268,7 +267,7 @@ public class TestFullOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
 
     int count = 0;
     exec.init();
@@ -305,7 +304,7 @@ public class TestFullOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
 
     int count = 0;
     exec.init();
@@ -341,7 +340,7 @@ public class TestFullOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
 
     int count = 0;
     exec.init();
@@ -379,7 +378,7 @@ public class TestFullOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
 
     int count = 0;
     exec.init();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 5fde9b8..3272309 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -35,8 +35,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.SortNode;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -313,7 +311,7 @@ public class TestFullOuterMergeJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
     int count = 0;
     exec.init();
 
@@ -348,7 +346,7 @@ public class TestFullOuterMergeJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
 
     int count = 0;
     exec.init();
@@ -383,7 +381,7 @@ public class TestFullOuterMergeJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
 
 
     int count = 0;
@@ -421,7 +419,7 @@ public class TestFullOuterMergeJoinExec {
     ProjectionExec proj = (ProjectionExec) exec;
 
     // if it chose the hash join WITH REVERSED ORDER, convert to merge join exec
-    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
 
     int count = 0;
     exec.init();
@@ -458,7 +456,7 @@ public class TestFullOuterMergeJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
 
     int count = 0;
     exec.init();
@@ -495,7 +493,7 @@ public class TestFullOuterMergeJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
 
     int count = 0;
     exec.init();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 47d6a94..cc890da 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -169,9 +169,9 @@ public class TestHashAntiJoinExec {
 
       // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
       if (scanLeftChild.getTableName().equals("people")) {
-        exec = new HashAntiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
       } else {
-        exec = new HashAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
       }
     } else if (exec instanceof HashJoinExec) {
       HashJoinExec join = (HashJoinExec) exec;
@@ -179,9 +179,9 @@ public class TestHashAntiJoinExec {
 
       // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
       if (scanLeftChild.getTableName().equals("people")) {
-        exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
       } else {
-        exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 1db8300..eff4d58 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -174,9 +174,9 @@ public class TestHashSemiJoinExec {
 
       // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
       if (scanLeftChild.getTableName().equals("people")) {
-        exec = new HashSemiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
       } else {
-        exec = new HashSemiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
       }
     } else if (exec instanceof HashJoinExec) {
       HashJoinExec join = (HashJoinExec) exec;
@@ -184,9 +184,9 @@ public class TestHashSemiJoinExec {
 
       // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
       if (scanLeftChild.getTableName().equals("people")) {
-        exec = new HashSemiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
       } else {
-        exec = new HashSemiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 5486e8d..b339253 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -34,7 +34,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -270,7 +269,7 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    assertTrue(proj.getChild() instanceof LeftOuterHashJoinExec);
+    assertTrue(proj.getChild() instanceof HashLeftOuterJoinExec);
 
     int count = 0;
     exec.init();
@@ -302,7 +301,7 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
        //for this small data set this is not likely to happen
       
       assertEquals(1, 0);
@@ -342,7 +341,7 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
       //for this small data set this is not likely to happen
       
       assertEquals(1, 0);
@@ -383,7 +382,7 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
       //for this small data set this is not likely to happen
       
       assertEquals(1, 0);
@@ -424,7 +423,7 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
       //for this small data set this is not likely to happen
       
       assertEquals(1, 0);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index a946934..00b076a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -259,11 +259,11 @@ public class TestLeftOuterNLJoinExec {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
-    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
-      LeftOuterHashJoinExec join = proj.getChild();
-      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
     }
@@ -300,11 +300,11 @@ public class TestLeftOuterNLJoinExec {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
-    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
-      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
-      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
      
@@ -343,11 +343,11 @@ public class TestLeftOuterNLJoinExec {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
-    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
-      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
-      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
      
@@ -387,11 +387,11 @@ public class TestLeftOuterNLJoinExec {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
-    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
-      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
-      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
      
@@ -430,11 +430,11 @@ public class TestLeftOuterNLJoinExec {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
-    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
-      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
-      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
      

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index d237f0c..4b0cece 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -47,7 +47,7 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-// this is not a physical operator in itself, but it uses the LeftOuterHashJoinExec with switched inputs order
+// this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order
 public class TestRightOuterHashJoinExec {
   private TajoConf conf;
   private final String TEST_PATH = "target/test-data/TestRightOuterHashJoinExec";

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_1.sql b/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_1.sql
new file mode 100644
index 0000000..0b10799
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_1.sql
@@ -0,0 +1 @@
+select c1,c2,c3 from table1 where exists (select c4 from table2 where c4 = table1.c1);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_2.sql b/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_2.sql
new file mode 100644
index 0000000..f4f82f9
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/queries/exists_predicate_2.sql
@@ -0,0 +1 @@
+select c1,c2,c3 from table1 where not exists (select c4 from table2 where c4 = table1.c1);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/queries/in_subquery_1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/in_subquery_1.sql b/tajo-core/tajo-core-backend/src/test/queries/in_subquery_1.sql
new file mode 100644
index 0000000..2a16a8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/queries/in_subquery_1.sql
@@ -0,0 +1 @@
+select c1,c2,c3 from table1 where c1 in (select c4 from table2);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/test/queries/in_subquery_2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/in_subquery_2.sql b/tajo-core/tajo-core-backend/src/test/queries/in_subquery_2.sql
new file mode 100644
index 0000000..64c8034
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/queries/in_subquery_2.sql
@@ -0,0 +1 @@
+select c1,c2,c3 from table1 where c1 not in (select c4 from table2);
\ No newline at end of file


[2/2] git commit: TAJO-232: Rename join operators and add other join operators to PhysicalPlanner. (hyunsik)

Posted by hy...@apache.org.
TAJO-232: Rename join operators and add other join operators to PhysicalPlanner. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d7645252
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d7645252
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d7645252

Branch: refs/heads/master
Commit: d7645252fb6bc3152d9774c6a655b8c083ff21b6
Parents: 1023c82
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Oct 7 17:02:29 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Oct 7 17:03:07 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/tajo/algebra/BinaryOperator.java |   8 +-
 .../apache/tajo/algebra/ExistsPredicate.java    |  44 +++
 .../java/org/apache/tajo/algebra/OpType.java    |   4 +-
 .../org/apache/tajo/algebra/RelationList.java   |   4 +-
 .../tajo/algebra/SimpleTableSubQuery.java       |  41 +++
 .../tajo/algebra/TablePrimarySubQuery.java      |  54 +++
 .../org/apache/tajo/algebra/TableSubQuery.java  |  54 ---
 .../org/apache/tajo/catalog/TestCatalog.java    |   2 +-
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   3 +-
 .../tajo/engine/parser/HiveConverter.java       |   2 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |  23 +-
 .../tajo/engine/planner/AlgebraVisitor.java     |   2 +-
 .../tajo/engine/planner/BaseAlgebraVisitor.java |   6 +-
 .../tajo/engine/planner/LogicalPlanner.java     |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 288 +++++++++++-----
 .../planner/logical/TableSubQueryNode.java      |   2 +-
 .../physical/BasicPhysicalExecutorVisitor.java  |  12 +-
 .../planner/physical/FullOuterHashJoinExec.java | 252 --------------
 .../physical/FullOuterMergeJoinExec.java        | 334 -------------------
 .../planner/physical/HashAntiJoinExec.java      | 105 ------
 .../planner/physical/HashFullOuterJoinExec.java | 252 ++++++++++++++
 .../planner/physical/HashLeftAntiJoinExec.java  | 110 ++++++
 .../planner/physical/HashLeftOuterJoinExec.java | 214 ++++++++++++
 .../planner/physical/HashLeftSemiJoinExec.java  | 107 ++++++
 .../planner/physical/HashSemiJoinExec.java      | 101 ------
 .../planner/physical/LeftOuterHashJoinExec.java | 214 ------------
 .../planner/physical/LeftOuterNLJoinExec.java   | 129 -------
 .../physical/MergeFullOuterJoinExec.java        | 334 +++++++++++++++++++
 .../planner/physical/NLLeftOuterJoinExec.java   | 129 +++++++
 .../physical/PhysicalExecutorVisitor.java       |   6 +-
 .../tajo/engine/parser/TestSQLAnalyzer.java     |  24 ++
 .../physical/TestFullOuterHashJoinExec.java     |   9 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  14 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   8 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   8 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  11 +-
 .../physical/TestLeftOuterNLJoinExec.java       |  40 +--
 .../physical/TestRightOuterHashJoinExec.java    |   2 +-
 .../src/test/queries/exists_predicate_1.sql     |   1 +
 .../src/test/queries/exists_predicate_2.sql     |   1 +
 .../src/test/queries/in_subquery_1.sql          |   1 +
 .../src/test/queries/in_subquery_2.sql          |   1 +
 43 files changed, 1600 insertions(+), 1361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f990530..14f935a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-232: Rename join operators and add other join operators to
+    PhysicalPlanner. (hyunsik)
+
     TAJO-229: Implement JoinGraph to represent a graph of relation joins.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
index 3e00e5e..a090623 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
@@ -32,16 +32,16 @@ public class BinaryOperator extends Expr {
     this.right = right;
   }
 
-  public Expr getLeft() {
-    return this.left;
+  public <T extends Expr> T getLeft() {
+    return (T) this.left;
   }
 
   public void setLeft(Expr left) {
     this.left = left;
   }
 
-  public Expr getRight() {
-    return this.right;
+  public <T extends Expr> T getRight() {
+    return (T) this.right;
   }
 
   public void setRight(Expr right) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
new file mode 100644
index 0000000..0823807
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class ExistsPredicate extends Expr {
+  private SimpleTableSubQuery simpleTableSubQuery;
+  private boolean not;
+
+  public ExistsPredicate(SimpleTableSubQuery simpleTableSubQuery, boolean not) {
+    super(OpType.InPredicate);
+    this.simpleTableSubQuery = simpleTableSubQuery;
+    this.not = not;
+  }
+
+  public boolean isNot() {
+    return this.not;
+  }
+
+  public SimpleTableSubQuery getSubQuery() {
+    return simpleTableSubQuery;
+  }
+
+  @Override
+  boolean equalsTo(Expr expr) {
+    ExistsPredicate another = (ExistsPredicate) expr;
+    return not == another.not && simpleTableSubQuery.equals(another.simpleTableSubQuery);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
index 145c52c..799b196 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
@@ -28,7 +28,8 @@ public enum OpType {
   Relation(Relation.class),
   RelationList(RelationList.class),
   Rename,
-  TableSubQuery(TableSubQuery.class),
+  SimpleTableSubQuery(SimpleTableSubQuery.class),
+  TablePrimaryTableSubQuery(TablePrimarySubQuery.class),
   Except(SetOperation.class),
   Having(Having.class),
 	Aggregation(Aggregation.class),
@@ -66,6 +67,7 @@ public enum OpType {
   InPredicate(InPredicate.class),
   ValueList(ValueListExpr.class),
   Is,
+  ExistsPredicate(ExistsPredicate.class),
 
   // string operator or pattern matching predicates
   LikePredicate(PatternMatchPredicate.class),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
index 8e9f878..918ff46 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
@@ -33,8 +33,8 @@ public class RelationList extends Expr {
       Preconditions.checkArgument(
           rel.getType() == OpType.Relation ||
           rel.getType() == OpType.Join ||
-          rel.getType() == OpType.TableSubQuery,
-          "Only Relation, Join, or TableSubQuery can be given to RelationList, but this expr "
+          rel.getType() == OpType.TablePrimaryTableSubQuery,
+          "Only Relation, Join, or TablePrimarySubQuery can be given to RelationList, but this expr "
               + " is " + rel.getType());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
new file mode 100644
index 0000000..91442dc
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class SimpleTableSubQuery extends Expr {
+  private Expr subquery;
+
+  public SimpleTableSubQuery(Expr subquery) {
+    super(OpType.SimpleTableSubQuery);
+    this.subquery = subquery;
+  }
+
+  public Expr getSubQuery() {
+    return subquery;
+  }
+
+  @Override
+  boolean equalsTo(Expr expr) {
+    return subquery.equals(subquery);
+  }
+
+  public String toJson() {
+    return JsonHelper.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
new file mode 100644
index 0000000..929e9b2
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class TablePrimarySubQuery extends Relation {
+  private Expr subquery;
+  private String [] columnNames;
+
+  public TablePrimarySubQuery(String relName, Expr subquery) {
+    super(OpType.TablePrimaryTableSubQuery, relName);
+    this.subquery = subquery;
+  }
+
+  public boolean hasColumnNames() {
+    return this.columnNames != null;
+  }
+
+  public void setColumnNames(String[] aliasList) {
+    this.columnNames = aliasList;
+  }
+
+  public String [] getColumnNames() {
+    return columnNames;
+  }
+
+  public Expr getSubQuery() {
+    return subquery;
+  }
+
+  @Override
+  boolean equalsTo(Expr expr) {
+    return subquery.equals(subquery);
+  }
+
+  public String toJson() {
+    return JsonHelper.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
deleted file mode 100644
index 55c44ba..0000000
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.algebra;
-
-public class TableSubQuery extends Relation {
-  private Expr subquery;
-  private String [] columnNames;
-
-  public TableSubQuery(String relName, Expr subquery) {
-    super(OpType.TableSubQuery, relName);
-    this.subquery = subquery;
-  }
-
-  public boolean hasColumnNames() {
-    return this.columnNames != null;
-  }
-
-  public void setColumnNames(String[] aliasList) {
-    this.columnNames = aliasList;
-  }
-
-  public String [] getColumnNames() {
-    return columnNames;
-  }
-
-  public Expr getSubQuery() {
-    return subquery;
-  }
-
-  @Override
-  boolean equalsTo(Expr expr) {
-    return subquery.equals(subquery);
-  }
-
-  public String toJson() {
-    return JsonHelper.toJson(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index fd1893b..97824b2 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -48,7 +48,7 @@ public class TestCatalog {
 	public static void setUp() throws Exception {
     TajoConf conf = new TajoConf();
 
-    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/TestCatalog/db");
+    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/TestCatalog/db;create=true");
     conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, "127.0.0.1:0");
 
 	  server = new CatalogServer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 6fc8166..1c6816a 100644
--- a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -831,6 +831,7 @@ predicate
   | in_predicate
   | pattern_matching_predicate
   | null_predicate
+  | exists_predicate
   ;
 
 /*
@@ -944,7 +945,7 @@ some : SOME | ANY;
 */
 
 exists_predicate
-  : EXISTS s=table_subquery
+  : NOT? EXISTS s=table_subquery
   ;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
index 0442e77..135d146 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
@@ -501,7 +501,7 @@ public class HiveConverter extends HiveParserBaseVisitor<Expr>{
                 }
             }
 
-            TableSubQuery subQuery = new TableSubQuery(tableAlias, current);
+            TablePrimarySubQuery subQuery = new TablePrimarySubQuery(tableAlias, current);
             current = subQuery;
         }
         // TODO: implement lateralView

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 021e0ba..49494a9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -400,7 +400,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       }
       return relation;
     } else if (ctx.derived_table() != null) {
-      return new TableSubQuery(ctx.name.getText(), visit(ctx.derived_table().table_subquery()));
+      return new TablePrimarySubQuery(ctx.name.getText(), visit(ctx.derived_table().table_subquery()));
     } else {
       return null;
     }
@@ -628,13 +628,17 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   }
 
   @Override
-  public ValueListExpr visitIn_predicate_value(SQLParser.In_predicate_valueContext ctx) {
-    int size = ctx.in_value_list().row_value_expression().size();
-    Expr [] exprs = new Expr[size];
-    for (int i = 0; i < size; i++) {
-      exprs[i] = visitRow_value_expression(ctx.in_value_list().row_value_expression(i));
+  public Expr visitIn_predicate_value(SQLParser.In_predicate_valueContext ctx) {
+    if (checkIfExist(ctx.in_value_list())) {
+      int size = ctx.in_value_list().row_value_expression().size();
+      Expr [] exprs = new Expr[size];
+      for (int i = 0; i < size; i++) {
+        exprs[i] = visitRow_value_expression(ctx.in_value_list().row_value_expression(i));
+      }
+      return new ValueListExpr(exprs);
+    } else {
+      return new SimpleTableSubQuery(visitChildren(ctx.table_subquery()));
     }
-    return new ValueListExpr(exprs);
   }
 
   @Override
@@ -692,6 +696,11 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   }
 
   @Override
+  public ExistsPredicate visitExists_predicate(SQLParser.Exists_predicateContext ctx) {
+    return new ExistsPredicate(new SimpleTableSubQuery(visitTable_subquery(ctx.table_subquery())), ctx.NOT() != null);
+  }
+
+  @Override
   public ColumnReferenceExpr visitColumn_reference(SQLParser.Column_referenceContext ctx) {
     ColumnReferenceExpr column = new ColumnReferenceExpr(ctx.name.getText());
     if (ctx.tb_name != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
index 633609a..345320c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
@@ -34,7 +34,7 @@ public interface AlgebraVisitor<CONTEXT, RESULT> {
   RESULT visitExcept(CONTEXT ctx, Stack<OpType> stack, SetOperation expr) throws PlanningException;
   RESULT visitIntersect(CONTEXT ctx, Stack<OpType> stack, SetOperation expr) throws PlanningException;
   RESULT visitRelationList(CONTEXT ctx, Stack<OpType> stack, RelationList expr) throws PlanningException;
-  RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TableSubQuery expr) throws PlanningException;
+  RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TablePrimarySubQuery expr) throws PlanningException;
   RESULT visitRelation(CONTEXT ctx, Stack<OpType> stack, Relation expr) throws PlanningException;
   RESULT visitCreateTable(CONTEXT ctx, Stack<OpType> stack, CreateTable expr) throws PlanningException;
   RESULT visitDropTable(CONTEXT ctx, Stack<OpType> stack, DropTable expr) throws PlanningException;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index 06ab38c..f974e1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -82,8 +82,8 @@ public abstract class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisi
       case RelationList:
         current = visitRelationList(ctx, stack, (RelationList) expr);
         break;
-      case TableSubQuery:
-        current = visitTableSubQuery(ctx, stack, (TableSubQuery) expr);
+      case TablePrimaryTableSubQuery:
+        current = visitTableSubQuery(ctx, stack, (TablePrimarySubQuery) expr);
         break;
       case Relation:
         current = visitRelation(ctx, stack, (Relation) expr);
@@ -209,7 +209,7 @@ public abstract class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisi
   }
 
   @Override
-  public RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TableSubQuery expr) throws PlanningException {
+  public RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TablePrimarySubQuery expr) throws PlanningException {
     stack.push(expr.getType());
     RESULT child = visitChild(ctx, stack, expr.getSubQuery());
     stack.pop();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 359dda3..d083ac1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -145,7 +145,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     }
   }
 
-  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr)
+  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TablePrimarySubQuery expr)
       throws PlanningException {
     QueryBlock newBlock = context.plan.newAndGetBlock(expr.getName());
     PlanContext newContext = new PlanContext(context.plan, newBlock);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c6e4695..14514a9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -211,34 +211,150 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
 
       case LEFT_SEMI:
+        return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
+
       case RIGHT_SEMI:
+        return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
 
       case LEFT_ANTI:
+        return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
+
       case RIGHT_ANTI:
+        return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
 
       default:
         throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
     }
   }
 
-  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
                                            PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        default:
+          // fallback algorithm
+          LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+      }
+
+    } else {
+      return new BNLJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+          return new HashJoinExec(context, plan, leftExec, rightExec);
+        case MERGE_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
+          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+        case HYBRID_HASH_JOIN:
+
+        default:
+          LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+      }
+
+
+    } else {
+      return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long leftSize = estimateSizeRecursive(context, leftLineage);
+    long rightSize = estimateSizeRecursive(context, rightLineage);
+
+    final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
+
+    boolean hashJoin = false;
+    if (leftSize < threshold || rightSize < threshold) {
+      hashJoin = true;
+    }
+
+    if (hashJoin) {
+      PhysicalExec selectedOuter;
+      PhysicalExec selectedInner;
+
+      // HashJoinExec loads the inner relation to memory.
+      if (leftSize <= rightSize) {
+        selectedInner = leftExec;
+        selectedOuter = rightExec;
+      } else {
+        selectedInner = rightExec;
+        selectedOuter = leftExec;
+      }
+
+      LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
+      return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+    } else {
+      return createMergeInnerJoin(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
+                                             PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+    ExternalSortExec outerSort = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
+        leftExec);
+    ExternalSortExec innerSort = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
+        rightExec);
+
+    LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
+    return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+  }
+
+  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
           LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
         case NESTED_LOOP_JOIN:
           //the right operand is too large, so we opt for NL implementation of left outer join
           LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-          return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+          return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
         default:
           LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
       }
     } else {
       return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
@@ -253,12 +369,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     if (rightTableVolume < threshold) {
       // we can implement left outer join using hash join, using the right operand as the build relation
       LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-      return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+      return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
     }
     else {
       //the right operand is too large, so we opt for NL implementation of left outer join
       LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-      return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+      return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
     }
   }
 
@@ -270,7 +386,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     long outerSize4 = estimateSizeRecursive(context, outerLineage4);
     if (outerSize4 < threshold){
       LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-      return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+      return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
     } else {
       return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
     }
@@ -290,7 +406,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   }
 
   private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
     if (property != null) {
@@ -298,7 +414,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
           LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-          return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
         case MERGE_JOIN:
           return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
         default:
@@ -312,7 +428,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   }
 
   private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
     if (property != null) {
@@ -334,7 +450,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  private FullOuterHashJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                             PhysicalExec leftExec, PhysicalExec rightExec)
       throws IOException {
     String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
@@ -354,10 +470,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       selectedRight = leftExec;
     }
     LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
-    return new FullOuterHashJoinExec(context, plan, selectedRight, selectedLeft);
+    return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
   }
 
-  private FullOuterMergeJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                               PhysicalExec leftExec, PhysicalExec rightExec)
       throws IOException {
     // if size too large, full outer merge join implementation
@@ -369,7 +485,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     ExternalSortExec innerSort3 = new ExternalSortExec(context, sm,
         new SortNode(UNGENERATED_PID,sortSpecs3[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
 
-    return new FullOuterMergeJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
+    return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
   }
 
   private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
@@ -386,114 +502,104 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+  /**
+   *  Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
+   */
+  private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
-
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
-
       switch (algorithm) {
-        case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
-        case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+
         default:
-          // fallback algorithm
-          LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
       }
-
     } else {
-      return new BNLJoinExec(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
-
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
-
       switch (algorithm) {
-        case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
-        case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
-          return new HashJoinExec(context, plan, leftExec, rightExec);
-        case MERGE_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
-          return createMergeJoin(context, plan, leftExec, rightExec);
-        case HYBRID_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
 
         default:
-          LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
-          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
-          return createMergeJoin(context, plan, leftExec, rightExec);
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
       }
-
-
     } else {
-      return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
     }
   }
 
-  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
-    long leftSize = estimateSizeRecursive(context, leftLineage);
-    long rightSize = estimateSizeRecursive(context, rightLineage);
-
-    final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
-
-    boolean hashJoin = false;
-    if (leftSize < threshold || rightSize < threshold) {
-      hashJoin = true;
-    }
-
-    if (hashJoin) {
-      PhysicalExec selectedOuter;
-      PhysicalExec selectedInner;
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
 
-      // HashJoinExec loads the inner relation to memory.
-      if (leftSize <= rightSize) {
-        selectedInner = leftExec;
-        selectedOuter = rightExec;
-      } else {
-        selectedInner = rightExec;
-        selectedOuter = leftExec;
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
       }
-
-      LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
-      return new HashJoinExec(context, plan, selectedOuter, selectedInner);
     } else {
-      return createMergeJoin(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
     }
   }
 
-  private MergeJoinExec createMergeJoin(TaskAttemptContext context, JoinNode plan,
-                                        PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
-        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
-    ExternalSortExec outerSort = new ExternalSortExec(context, sm,
-        new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
-        leftExec);
-    ExternalSortExec innerSort = new ExternalSortExec(context, sm,
-        new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
-        rightExec);
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
 
-    LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
-    return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+      }
+    } else {
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+    }
   }
 
   public PhysicalExec createStorePlan(TaskAttemptContext ctx,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index 39ae1e2..846ed41 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -79,7 +79,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
 
   @Override
   public PlanString getPlanString() {
-    PlanString planStr = new PlanString("TableSubQuery");
+    PlanString planStr = new PlanString("TablePrimarySubQuery");
     planStr.appendTitle(" as ").appendTitle(tableName);
     return planStr;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index fc569d7..4723ecc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -38,10 +38,10 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
       return visitProjection((ProjectionExec) exec, stack, context);
     } else if (exec instanceof HashJoinExec) {
       return visitHashJoin((HashJoinExec) exec, stack, context);
-    } else if (exec instanceof HashAntiJoinExec) {
-      return visitHashAntiJoin((HashAntiJoinExec) exec, stack, context);
-    } else if (exec instanceof HashSemiJoinExec) {
-      return visitHashSemiJoin((HashSemiJoinExec) exec, stack, context);
+    } else if (exec instanceof HashLeftAntiJoinExec) {
+      return visitHashAntiJoin((HashLeftAntiJoinExec) exec, stack, context);
+    } else if (exec instanceof HashLeftSemiJoinExec) {
+      return visitHashSemiJoin((HashLeftSemiJoinExec) exec, stack, context);
     } else if (exec instanceof LimitExec) {
       return visitLimit((LimitExec) exec, stack, context);
     } else {
@@ -108,13 +108,13 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
   }
 
   @Override
-  public RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
       throws PhysicalPlanningException {
     return visitBinaryExecutor(exec, stack, context);
   }
 
   @Override
-  public RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
       throws PhysicalPlanningException {
     return visitBinaryExecutor(exec, stack, context);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
deleted file mode 100644
index aebfdb5..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-import java.util.*;
-
-
-public class FullOuterHashJoinExec extends BinaryPhysicalExec {
-  // from logical plan
-  protected JoinNode plan;
-  protected EvalNode joinQual;
-
-  protected List<Column[]> joinKeyPairs;
-
-  // temporal tuples and states for nested loop join
-  protected boolean first = true;
-  protected FrameTuple frameTuple;
-  protected Tuple outTuple = null;
-  protected Map<Tuple, List<Tuple>> tupleSlots;
-  protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
-  protected Tuple leftTuple;
-  protected Tuple leftKeyTuple;
-
-  protected int [] leftKeyList;
-  protected int [] rightKeyList;
-
-  protected boolean finished = false;
-  protected boolean shouldGetLeftTuple = true;
-
-  // projection
-  protected final Projector projector;
-  protected final EvalContext [] evalContexts;
-
-  private int rightNumCols;
-  private int leftNumCols;
-  private Map<Tuple, Boolean> matched;
-
-  public FullOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
-                               PhysicalExec inner) {
-    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
-        plan.getOutSchema(), outer, inner);
-    this.plan = plan;
-    this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
-    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
-    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
-    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
-    this.matched = new HashMap<Tuple, Boolean>(10000);
-
-    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
-        outer.getSchema(), inner.getSchema());
-
-    leftKeyList = new int[joinKeyPairs.size()];
-    rightKeyList = new int[joinKeyPairs.size()];
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
-    }
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
-    }
-
-    // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.getColumnNum());
-    leftKeyTuple = new VTuple(leftKeyList.length);
-
-    leftNumCols = outer.getSchema().getColumnNum();
-    rightNumCols = inner.getSchema().getColumnNum();
-  }
-
-  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
-    for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
-    }
-  }
-
-  public Tuple getNextUnmatchedRight() {
-
-    List<Tuple> newValue;
-    Tuple returnedTuple;
-    // get a keyTUple from the matched hashmap with a boolean false value
-    for(Tuple aKeyTuple : matched.keySet()) {
-      if(matched.get(aKeyTuple) == false) {
-        newValue = tupleSlots.get(aKeyTuple);
-        returnedTuple = newValue.remove(0);
-        tupleSlots.put(aKeyTuple, newValue);
-
-        // after taking the last element from the list in tupleSlots, set flag true in matched as well
-        if(newValue.isEmpty()){
-          matched.put(aKeyTuple, true);
-        }
-
-        return returnedTuple;
-      }
-    }
-    return null;
-  }
-
-  public Tuple next() throws IOException {
-    if (first) {
-      loadRightToHashTable();
-    }
-
-    Tuple rightTuple;
-    boolean found = false;
-
-    while(!finished) {
-      if (shouldGetLeftTuple) { // initially, it is true.
-        // getting new outer
-        leftTuple = leftChild.next(); // it comes from a disk
-        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
-          Tuple unmatchedRightTuple = getNextUnmatchedRight();
-          if( unmatchedRightTuple == null) {
-            finished = true;
-            outTuple = null;
-            return null;
-          } else {
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-
-            return outTuple;
-          }
-        }
-
-        // getting corresponding right
-        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
-        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
-          iterator = tupleSlots.get(leftKeyTuple).iterator();
-          shouldGetLeftTuple = false;
-        } else {
-          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
-          //output a tuple with the nulls padded rightTuple
-          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-          frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
-          // we simulate we found a match, which is exactly the null padded one
-          shouldGetLeftTuple = true;
-          return outTuple;
-        }
-      }
-
-      // getting a next right tuple on in-memory hash table.
-      rightTuple = iterator.next();
-      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
-      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        found = true;
-        getKeyLeftTuple(leftTuple, leftKeyTuple);
-        matched.put(leftKeyTuple, true);
-      }
-
-      if (!iterator.hasNext()) { // no more right tuples for this hash key
-        shouldGetLeftTuple = true;
-      }
-
-      if (found) {
-        break;
-      }
-    }
-    return outTuple;
-  }
-
-  protected void loadRightToHashTable() throws IOException {
-    Tuple tuple;
-    Tuple keyTuple;
-
-    while ((tuple = rightChild.next()) != null) {
-      keyTuple = new VTuple(joinKeyPairs.size());
-      List<Tuple> newValue;
-      for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(rightKeyList[i]));
-      }
-
-      if (tupleSlots.containsKey(keyTuple)) {
-        newValue = tupleSlots.get(keyTuple);
-        newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
-      } else {
-        newValue = new ArrayList<Tuple>();
-        newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
-        matched.put(keyTuple,false);
-      }
-    }
-    first = false;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-
-    tupleSlots.clear();
-    first = true;
-
-    finished = false;
-    iterator = null;
-    shouldGetLeftTuple = true;
-  }
-
-  public void close() throws IOException {
-    tupleSlots.clear();
-  }
-
-  public JoinNode getPlan() {
-    return this.plan;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
deleted file mode 100644
index 2c11fea..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FullOuterMergeJoinExec extends BinaryPhysicalExec {
-  // from logical plan
-  private JoinNode joinNode;
-  private EvalNode joinQual;
-  private EvalContext qualCtx;
-
-  // temporal tuples and states for nested loop join
-  private FrameTuple frameTuple;
-  private Tuple leftTuple = null;
-  private Tuple rightTuple = null;
-  private Tuple outTuple = null;
-  private Tuple leftNext = null;
-
-  private final List<Tuple> leftTupleSlots;
-  private final List<Tuple> rightTupleSlots;
-
-  private JoinTupleComparator joincomparator = null;
-  private TupleComparator[] tupleComparator = null;
-
-  private final static int INITIAL_TUPLE_SLOT = 10000;
-
-  private boolean end = false;
-
-  // projection
-  private final Projector projector;
-  private final EvalContext [] evalContexts;
-
-  private int rightNumCols;
-  private int leftNumCols;
-  private int posRightTupleSlots = -1;
-  private int posLeftTupleSlots = -1;
-  boolean endInPopulationStage = false;
-  private boolean initRightDone = false;
-
-  public FullOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
-                                PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
-    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
-    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
-        "but there is no join condition");
-    this.joinNode = plan;
-    this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
-
-    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
-    this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
-    SortSpec[][] sortSpecs = new SortSpec[2][];
-    sortSpecs[0] = leftSortKey;
-    sortSpecs[1] = rightSortKey;
-
-    this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
-        rightChild.getSchema(), sortSpecs);
-    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
-        plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
-
-    // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.getColumnNum());
-
-    leftNumCols = leftChild.getSchema().getColumnNum();
-    rightNumCols = rightChild.getSchema().getColumnNum();
-  }
-
-  public JoinNode getPlan(){
-    return this.joinNode;
-  }
-
-  public Tuple next() throws IOException {
-    Tuple previous;
-
-    for (;;) {
-      boolean newRound = false;
-      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
-        newRound = true;
-      }
-      if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
-        newRound = true;
-      }
-
-      if(newRound == true){
-
-        if (end) {
-
-          ////////////////////////////////////////////////////////////////////////
-          // FINALIZING STAGE
-          ////////////////////////////////////////////////////////////////////////
-          // the finalizing stage, where remaining tuples on the right are
-          // transformed into left-padded results while tuples on the left
-          // are transformed into right-padded results
-
-          // before exit, a left-padded tuple should be built for all remaining
-          // right side and a right-padded tuple should be built for all remaining
-          // left side
-
-          if (initRightDone == false) {
-            // maybe the left operand was empty => the right one didn't have the chance to initialize
-            rightTuple = rightChild.next();
-            initRightDone = true;
-          }
-
-          if((leftTuple == null) && (rightTuple == null)) {
-            return null;
-          }
-
-          if((leftTuple == null) && (rightTuple != null)){
-            // output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // we simulate we found a match, which is exactly the null padded one
-            rightTuple = rightChild.next();
-            return outTuple;
-          }
-
-          if((leftTuple != null) && (rightTuple == null)){
-            // output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-            frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // we simulate we found a match, which is exactly the null padded one
-            leftTuple = leftChild.next();
-            return outTuple;
-          }
-        } // if end
-
-        ////////////////////////////////////////////////////////////////////////
-        // INITIALIZING STAGE
-        ////////////////////////////////////////////////////////////////////////
-        // initializing stage, reading the first tuple on each side
-        if (leftTuple == null) {
-          leftTuple = leftChild.next();
-          if( leftTuple == null){
-            end = true;
-            continue;
-          }
-        }
-        if (rightTuple == null) {
-          rightTuple = rightChild.next();
-          initRightDone = true;
-          if (rightTuple == null) {
-            end = true;
-            continue;
-          }
-        }
-
-        // reset tuple slots for a new round
-        leftTupleSlots.clear();
-        rightTupleSlots.clear();
-        posRightTupleSlots = -1;
-        posLeftTupleSlots = -1;
-
-        ////////////////////////////////////////////////////////////////////////
-        // Comparison and Move Forward Stage
-        ////////////////////////////////////////////////////////////////////////
-        // advance alternatively on each side until a match is found
-        int cmp;
-        while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
-
-          if (cmp > 0) {
-
-            //before getting a new tuple from the right,  a leftnullpadded tuple should be built
-            //output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // BEFORE RETURN, MOVE FORWARD
-            rightTuple = rightChild.next();
-            if(rightTuple == null) {
-              end = true;
-            }
-
-            return outTuple;
-
-          } else if (cmp < 0) {
-            // before getting a new tuple from the left,  a rightnullpadded tuple should be built
-            // output a tuple with the nulls padded rightTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-            frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // we simulate we found a match, which is exactly the null padded one
-            // BEFORE RETURN, MOVE FORWARD
-            leftTuple = leftChild.next();
-            if(leftTuple == null) {
-              end = true;
-            }
-
-            return outTuple;
-
-          } // if (cmp < 0)
-        } //while
-
-
-        ////////////////////////////////////////////////////////////////////////
-        // SLOTS POPULATION STAGE
-        ////////////////////////////////////////////////////////////////////////
-        // once a match is found, retain all tuples with this key in tuple slots
-        // on each side
-        if(!end) {
-          endInPopulationStage = false;
-
-          boolean endLeft = false;
-          boolean endRight = false;
-
-          previous = new VTuple(leftTuple);
-          do {
-            leftTupleSlots.add(new VTuple(leftTuple));
-            leftTuple = leftChild.next();
-            if(leftTuple == null) {
-              endLeft = true;
-            }
-
-
-          } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
-          posLeftTupleSlots = 0;
-
-
-          previous = new VTuple(rightTuple);
-          do {
-            rightTupleSlots.add(new VTuple(rightTuple));
-            rightTuple = rightChild.next();
-            if(rightTuple == null) {
-              endRight = true;
-            }
-
-          } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
-          posRightTupleSlots = 0;
-
-          if ((endLeft == true) || (endRight == true)) {
-            end = true;
-            endInPopulationStage = true;
-          }
-
-        } // if end false
-      } // if newRound
-
-
-      ////////////////////////////////////////////////////////////////////////
-      // RESULTS STAGE
-      ////////////////////////////////////////////////////////////////////////
-      // now output result matching tuples from the slots
-      // if either we haven't reached end on neither side, or we did reach end
-      // on one(or both) sides but that happened in the slots population step
-      // (i.e. refers to next round)
-      if(!end || (end && endInPopulationStage)){
-        if(posLeftTupleSlots == 0){
-          leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
-          posLeftTupleSlots = posLeftTupleSlots + 1;
-        }
-
-        if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
-          Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
-          posRightTupleSlots = posRightTupleSlots + 1;
-          frameTuple.set(leftNext, aTuple);
-          joinQual.eval(qualCtx, inSchema, frameTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
-          return outTuple;
-        } else {
-          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
-          if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
-            //rewind the right slots position
-            posRightTupleSlots = 0;
-            Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
-            posRightTupleSlots = posRightTupleSlots + 1;
-            leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
-            posLeftTupleSlots = posLeftTupleSlots + 1;
-
-            frameTuple.set(leftNext, aTuple);
-            joinQual.eval(qualCtx, inSchema, frameTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            return outTuple;
-          }
-        }
-      } // the second if end false
-    } // for
-  }
-
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-    leftTupleSlots.clear();
-    rightTupleSlots.clear();
-    posRightTupleSlots = -1;
-    posLeftTupleSlots = -1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
deleted file mode 100644
index 8f2d115..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-
-public class HashAntiJoinExec extends HashJoinExec {
-  private Tuple rightNullTuple;
-
-  public HashAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
-                      PhysicalExec right) {
-    super(context, plan, left, right);
-    // NUll Tuple
-    rightNullTuple = new VTuple(leftChild.outColumnNum);
-    for (int i = 0; i < leftChild.outColumnNum; i++) {
-      rightNullTuple.put(i, NullDatum.get());
-    }
-  }
-
-  /**
-   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
-   * next() method finds the first unmatched tuple from both tables.
-   *
-   * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
-   * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
-   * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
-   *
-   * @return The tuple which is unmatched to a given join condition.
-   * @throws IOException
-   */
-  public Tuple next() throws IOException {
-    if (first) {
-      loadRightToHashTable();
-    }
-
-    Tuple rightTuple;
-    boolean notFound;
-
-    while(!finished) {
-
-      // getting new outer
-      leftTuple = leftChild.next(); // it comes from a disk
-      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-        finished = true;
-        return null;
-      }
-
-      // Try to find a hash bucket in in-memory hash table
-      getKeyLeftTuple(leftTuple, leftKeyTuple);
-      if (tupleSlots.containsKey(leftKeyTuple)) {
-        // if found, it gets a hash bucket from the hash table.
-        iterator = tupleSlots.get(leftKeyTuple).iterator();
-      } else {
-        // if not found, it returns a tuple.
-        frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        return outTuple;
-      }
-
-      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
-      // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
-      notFound = true;
-      while (notFound && iterator.hasNext()) {
-        rightTuple = iterator.next();
-        frameTuple.set(leftTuple, rightTuple);
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
-          notFound = false;
-        }
-      }
-
-      if (notFound) { // if there is no matched tuple
-        frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        break;
-      }
-    }
-
-    return outTuple;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
new file mode 100644
index 0000000..ac591d3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class HashFullOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private Map<Tuple, Boolean> matched;
+
+  public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+                               PhysicalExec inner) {
+    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+        plan.getOutSchema(), outer, inner);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = joinQual.newContext();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+    this.matched = new HashMap<Tuple, Boolean>(10000);
+
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
+        outer.getSchema(), inner.getSchema());
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = outer.getSchema().getColumnNum();
+    rightNumCols = inner.getSchema().getColumnNum();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple getNextUnmatchedRight() {
+
+    List<Tuple> newValue;
+    Tuple returnedTuple;
+    // get a keyTUple from the matched hashmap with a boolean false value
+    for(Tuple aKeyTuple : matched.keySet()) {
+      if(matched.get(aKeyTuple) == false) {
+        newValue = tupleSlots.get(aKeyTuple);
+        returnedTuple = newValue.remove(0);
+        tupleSlots.put(aKeyTuple, newValue);
+
+        // after taking the last element from the list in tupleSlots, set flag true in matched as well
+        if(newValue.isEmpty()){
+          matched.put(aKeyTuple, true);
+        }
+
+        return returnedTuple;
+      }
+    }
+    return null;
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+          Tuple unmatchedRightTuple = getNextUnmatchedRight();
+          if( unmatchedRightTuple == null) {
+            finished = true;
+            outTuple = null;
+            return null;
+          } else {
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+
+            return outTuple;
+          }
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        found = true;
+        getKeyLeftTuple(leftTuple, leftKeyTuple);
+        matched.put(leftKeyTuple, true);
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      List<Tuple> newValue;
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      if (tupleSlots.containsKey(keyTuple)) {
+        newValue = tupleSlots.get(keyTuple);
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+        matched.put(keyTuple,false);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  public void close() throws IOException {
+    tupleSlots.clear();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+