You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/02 11:18:02 UTC

git commit: TAJO-142: Implement hash anti-join operator. (hyunsik)

Updated Branches:
  refs/heads/master 205621168 -> 0f3965a00


TAJO-142: Implement hash anti-join operator. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/0f3965a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/0f3965a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/0f3965a0

Branch: refs/heads/master
Commit: 0f3965a0005fb6f041d487b5ce75f3ef6dc2d8e3
Parents: 2056211
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Sep 2 18:13:23 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Sep 2 18:14:34 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/algebra/JoinType.java  |   8 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   2 +-
 .../tajo/engine/planner/LogicalPlanner.java     |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  14 +-
 .../engine/planner/physical/BNLJoinExec.java    |  16 +-
 .../planner/physical/BinaryPhysicalExec.java    |  28 +--
 .../planner/physical/HashAntiJoinExec.java      | 105 +++++++++++
 .../engine/planner/physical/HashJoinExec.java   |  94 ++++-----
 .../engine/planner/physical/MergeJoinExec.java  |  14 +-
 .../engine/planner/physical/NLJoinExec.java     |   6 +-
 .../tajo/engine/planner/physical/UnionExec.java |   4 +-
 .../planner/rewrite/FilterPushDownRule.java     |   2 +-
 .../planner/physical/TestBNLJoinExec.java       |  22 +--
 .../planner/physical/TestHashAntiJoinExec.java  | 189 +++++++++++++++++++
 .../planner/physical/TestHashJoinExec.java      |  10 +-
 .../planner/physical/TestMergeJoinExec.java     |   8 +-
 .../engine/planner/physical/TestNLJoinExec.java |   8 +-
 18 files changed, 417 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c871c83..635205a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-142: Implement hash anti-join operator. (hyunsik)
+
     TAJO-94: Remove duplicate proto files. (hyunsik)
 
     TAJO-141: Set on demand as the default cluster mode. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
index 3e0a137..b506719 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/JoinType.java
@@ -19,10 +19,14 @@
 package org.apache.tajo.algebra;
 
 public enum JoinType {
-  CROSS_JOIN,
+  CROSS,
   INNER,
   LEFT_OUTER,
   RIGHT_OUTER,
   FULL_OUTER,
-  UNION
+  UNION,
+  LEFT_ANTI,
+  RIGHT_ANTI,
+  LEFT_SEMI,
+  RIGHT_SEMI
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index aad275d..ed62171 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -299,7 +299,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   public Join visitJoined_table_primary(SQLParser.Joined_table_primaryContext ctx) {
     Join join;
     if (ctx.CROSS() != null) {
-      join = new Join(JoinType.CROSS_JOIN);
+      join = new Join(JoinType.CROSS);
     } else if (ctx.UNION() != null) {
       join = new Join(JoinType.UNION);
     } else { // qualified join or natural

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 2c4f241..f78b367 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -245,7 +245,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   private static LogicalNode createCatasianProduct(LogicalNode left, LogicalNode right) {
-    JoinNode join = new JoinNode(JoinType.CROSS_JOIN, left, right);
+    JoinNode join = new JoinNode(JoinType.CROSS, left, right);
     Schema joinSchema = SchemaUtil.merge(
         join.getLeftChild().getOutSchema(),
         join.getRightChild().getOutSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index a2e38b1..06ac2d6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -150,8 +150,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                      PhysicalExec outer, PhysicalExec inner)
       throws IOException {
     switch (joinNode.getJoinType()) {
-      case CROSS_JOIN:
-        LOG.info("The planner chooses NLJoinExec");
+      case CROSS:
+        LOG.info("The planner chooses [Nested Loop Join]");
         return new NLJoinExec(ctx, joinNode, outer, inner);
 
       case INNER:
@@ -180,7 +180,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
             selectedOuter = outer;
           }
 
-          LOG.info("The planner chooses HashJoinExec");
+          LOG.info("The planner chooses [InMemory Hash Join]");
           return new HashJoinExec(ctx, joinNode, selectedOuter, selectedInner);
         }
 
@@ -194,7 +194,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
             new SortNode(sortSpecs[1], inner.getSchema(), inner.getSchema()),
             inner);
 
-        LOG.info("The planner chooses MergeJoinExec");
+        LOG.info("The planner chooses [Merge Join]");
         return new MergeJoinExec(ctx, joinNode, outerSort, innerSort,
             sortSpecs[0], sortSpecs[1]);
     }
@@ -243,7 +243,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                         GroupbyNode groupbyNode, PhysicalExec subOp) throws IOException {
     Column[] grpColumns = groupbyNode.getGroupingColumns();
     if (grpColumns.length == 0) {
-      LOG.info("The planner chooses HashAggregationExec");
+      LOG.info("The planner chooses [Hash Aggregation]");
       return new HashAggregateExec(ctx, groupbyNode, subOp);
     } else {
       String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
@@ -253,7 +253,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       // if the relation size is less than the reshold,
       // the hash aggregation will be used.
       if (estimatedSize <= threshold) {
-        LOG.info("The planner chooses HashAggregationExec");
+        LOG.info("The planner chooses [Hash Aggregation]");
         return new HashAggregateExec(ctx, groupbyNode, subOp);
       } else {
         SortSpec[] specs = new SortSpec[grpColumns.length];
@@ -266,7 +266,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         // SortExec sortExec = new SortExec(sortNode, child);
         ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode,
             subOp);
-        LOG.info("The planner chooses SortAggregationExec");
+        LOG.info("The planner chooses [Sort Aggregation]");
         return new SortAggregateExec(ctx, groupbyNode, sortExec);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 666d049..694602b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -84,7 +84,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
     if (outerTupleSlots.isEmpty()) {
       for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
-        Tuple t = outerChild.next();
+        Tuple t = leftChild.next();
         if (t == null) {
           outerEnd = true;
           break;
@@ -97,7 +97,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
     if (innerTupleSlots.isEmpty()) {
       for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
-        Tuple t = innerChild.next();
+        Tuple t = rightChild.next();
         if (t == null) {
           innerEnd = true;
           break;
@@ -107,7 +107,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
       innerIterator = innerTupleSlots.iterator();
     }
 
-    if((innext = innerChild.next()) == null){
+    if((innext = rightChild.next()) == null){
       innerEnd = true;
     }
 
@@ -118,7 +118,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
           innerIterator = innerTupleSlots.iterator();
         } else {
           if (innerEnd) {
-            innerChild.rescan();
+            rightChild.rescan();
             innerEnd = false;
             
             if (outerEnd) {
@@ -126,7 +126,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
             }
             outerTupleSlots.clear();
             for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
-              Tuple t = outerChild.next();
+              Tuple t = leftChild.next();
               if (t == null) {
                 outerEnd = true;
                 break;
@@ -148,7 +148,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
           if (innext != null) {
             innerTupleSlots.add(innext);
             for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill inner
-              Tuple t = innerChild.next();
+              Tuple t = rightChild.next();
               if (t == null) {
                 innerEnd = true;
                 break;
@@ -157,7 +157,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
             }
           } else {
             for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill inner
-              Tuple t = innerChild.next();
+              Tuple t = rightChild.next();
               if (t == null) {
                 innerEnd = true;
                 break;
@@ -166,7 +166,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
             }
           }
           
-          if ((innext = innerChild.next()) == null) {
+          if ((innext = rightChild.next()) == null) {
             innerEnd = true;
           }
           innerIterator = innerTupleSlots.iterator();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index a17ddf4..e98d505 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -24,40 +24,40 @@ import org.apache.tajo.catalog.Schema;
 import java.io.IOException;
 
 public abstract class BinaryPhysicalExec extends PhysicalExec {
-  protected final PhysicalExec outerChild;
-  protected final PhysicalExec innerChild;
+  protected final PhysicalExec leftChild;
+  protected final PhysicalExec rightChild;
 
   public BinaryPhysicalExec(final TaskAttemptContext context,
                             final Schema inSchema, final Schema outSchema,
                             final PhysicalExec outer, final PhysicalExec inner) {
     super(context, inSchema, outSchema);
-    this.outerChild = outer;
-    this.innerChild = inner;
+    this.leftChild = outer;
+    this.rightChild = inner;
   }
 
-  public PhysicalExec getOuterChild() {
-    return outerChild;
+  public PhysicalExec getLeftChild() {
+    return leftChild;
   }
 
-  public PhysicalExec getInnerChild() {
-    return innerChild;
+  public PhysicalExec getRightChild() {
+    return rightChild;
   }
 
   @Override
   public void init() throws IOException {
-    outerChild.init();
-    innerChild.init();
+    leftChild.init();
+    rightChild.init();
   }
 
   @Override
   public void rescan() throws IOException {
-    outerChild.rescan();
-    innerChild.rescan();
+    leftChild.rescan();
+    rightChild.rescan();
   }
 
   @Override
   public void close() throws IOException {
-    outerChild.close();
-    innerChild.close();
+    leftChild.close();
+    rightChild.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
new file mode 100644
index 0000000..8f2d115
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class HashAntiJoinExec extends HashJoinExec {
+  private Tuple rightNullTuple;
+
+  public HashAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
+                      PhysicalExec right) {
+    super(context, plan, left, right);
+    // NUll Tuple
+    rightNullTuple = new VTuple(leftChild.outColumnNum);
+    for (int i = 0; i < leftChild.outColumnNum; i++) {
+      rightNullTuple.put(i, NullDatum.get());
+    }
+  }
+
+  /**
+   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+   * next() method finds the first unmatched tuple from both tables.
+   *
+   * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
+   * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
+   * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
+   *
+   * @return The tuple which is unmatched to a given join condition.
+   * @throws IOException
+   */
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean notFound;
+
+    while(!finished) {
+
+      // getting new outer
+      leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = true;
+        return null;
+      }
+
+      // Try to find a hash bucket in in-memory hash table
+      getKeyLeftTuple(leftTuple, leftKeyTuple);
+      if (tupleSlots.containsKey(leftKeyTuple)) {
+        // if found, it gets a hash bucket from the hash table.
+        iterator = tupleSlots.get(leftKeyTuple).iterator();
+      } else {
+        // if not found, it returns a tuple.
+        frameTuple.set(leftTuple, rightNullTuple);
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        return outTuple;
+      }
+
+      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+      // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
+      notFound = true;
+      while (notFound && iterator.hasNext()) {
+        rightTuple = iterator.next();
+        frameTuple.set(leftTuple, rightTuple);
+        joinQual.eval(qualCtx, inSchema, frameTuple);
+        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+          notFound = false;
+        }
+      }
+
+      if (notFound) { // if there is no matched tuple
+        frameTuple.set(leftTuple, rightNullTuple);
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index cce5587..355b357 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -35,30 +35,30 @@ import java.util.*;
 
 public class HashJoinExec extends BinaryPhysicalExec {
   // from logical plan
-  private JoinNode plan;
-  private EvalNode joinQual;
+  protected JoinNode plan;
+  protected EvalNode joinQual;
 
-  private List<Column[]> joinKeyPairs;
+  protected List<Column[]> joinKeyPairs;
 
   // temporal tuples and states for nested loop join
-  private boolean first = true;
-  private FrameTuple frameTuple;
-  private Tuple outTuple = null;
-  private Map<Tuple, List<Tuple>> tupleSlots;
-  private Iterator<Tuple> iterator = null;
-  private EvalContext qualCtx;
-  private Tuple outerTuple;
-  private Tuple outerKeyTuple;
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
 
-  private int [] outerKeyList;
-  private int [] innerKeyList;
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
 
-  private boolean finished = false;
-  boolean nextOuter = true;
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
 
   // projection
-  private final Projector projector;
-  private final EvalContext [] evalContexts;
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
 
   public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
       PhysicalExec inner) {
@@ -72,15 +72,15 @@ public class HashJoinExec extends BinaryPhysicalExec {
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
         outer.getSchema(), inner.getSchema());
 
-    outerKeyList = new int[joinKeyPairs.size()];
-    innerKeyList = new int[joinKeyPairs.size()];
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
 
     for (int i = 0; i < joinKeyPairs.size(); i++) {
-      outerKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
     }
 
     for (int i = 0; i < joinKeyPairs.size(); i++) {
-      innerKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
     }
 
     // for projection
@@ -90,56 +90,56 @@ public class HashJoinExec extends BinaryPhysicalExec {
     // for join
     frameTuple = new FrameTuple();
     outTuple = new VTuple(outSchema.getColumnNum());
-    outerKeyTuple = new VTuple(outerKeyList.length);
+    leftKeyTuple = new VTuple(leftKeyList.length);
   }
 
-  private void getKeyOuterTuple(final Tuple outerTuple, Tuple keyTuple) {
-    for (int i = 0; i < outerKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(outerKeyList[i]));
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
     }
   }
 
   public Tuple next() throws IOException {
     if (first) {
-      loadInnerTable();
+      loadRightToHashTable();
     }
 
-    Tuple innerTuple;
+    Tuple rightTuple;
     boolean found = false;
 
     while(!finished) {
 
-      if (nextOuter) {
+      if (shouldGetLeftTuple) { // initially, it is true.
         // getting new outer
-        outerTuple = outerChild.next();
-        if (outerTuple == null) {
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
           finished = true;
           return null;
         }
 
-        // getting corresponding inner
-        getKeyOuterTuple(outerTuple, outerKeyTuple);
-        if (tupleSlots.containsKey(outerKeyTuple)) {
-          iterator = tupleSlots.get(outerKeyTuple).iterator();
-          nextOuter = false;
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
         } else {
-          nextOuter = true;
+          shouldGetLeftTuple = true;
           continue;
         }
       }
 
-      // getting next inner tuple
-      innerTuple = iterator.next();
-      frameTuple.set(outerTuple, innerTuple);
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
       joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).asBool()) {
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
         projector.eval(evalContexts, frameTuple);
         projector.terminate(evalContexts, outTuple);
         found = true;
       }
 
-      if (!iterator.hasNext()) { // no more inner tuple
-        nextOuter = true;
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
       }
 
       if (found) {
@@ -150,15 +150,15 @@ public class HashJoinExec extends BinaryPhysicalExec {
     return outTuple;
   }
 
-  private void loadInnerTable() throws IOException {
+  protected void loadRightToHashTable() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
 
-    while ((tuple = innerChild.next()) != null) {
+    while ((tuple = rightChild.next()) != null) {
       keyTuple = new VTuple(joinKeyPairs.size());
       List<Tuple> newValue;
-      for (int i = 0; i < innerKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(innerKeyList[i]));
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
       }
 
       if (tupleSlots.containsKey(keyTuple)) {
@@ -183,7 +183,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
 
     finished = false;
     iterator = null;
-    nextOuter = true;
+    shouldGetLeftTuple = true;
   }
 
   public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index 0179da5..1820c32 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -96,7 +96,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
     outTuple = new VTuple(outSchema.getColumnNum());
   }
 
-  public JoinNode getJoinNode(){
+  public JoinNode getPlan(){
     return this.joinNode;
   }
 
@@ -110,10 +110,10 @@ public class MergeJoinExec extends BinaryPhysicalExec {
         }
 
         if(outerTuple == null){
-          outerTuple = outerChild.next();
+          outerTuple = leftChild.next();
         }
         if(innerTuple == null){
-          innerTuple = innerChild.next();
+          innerTuple = rightChild.next();
         }
 
         outerTupleSlots.clear();
@@ -122,9 +122,9 @@ public class MergeJoinExec extends BinaryPhysicalExec {
         int cmp;
         while ((cmp = joincomparator.compare(outerTuple, innerTuple)) != 0) {
           if (cmp > 0) {
-            innerTuple = innerChild.next();
+            innerTuple = rightChild.next();
           } else if (cmp < 0) {
-            outerTuple = outerChild.next();
+            outerTuple = leftChild.next();
           }
           if (innerTuple == null || outerTuple == null) {
             return null;
@@ -134,7 +134,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
         previous = outerTuple;
         do {
           outerTupleSlots.add(outerTuple);
-          outerTuple = outerChild.next();
+          outerTuple = leftChild.next();
           if (outerTuple == null) {
             end = true;
             break;
@@ -146,7 +146,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
         previous = innerTuple;
         do {
           innerTupleSlots.add(innerTuple);
-          innerTuple = innerChild.next();
+          innerTuple = rightChild.next();
           if (innerTuple == null) {
             end = true;
             break;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 3ce25ea..15d8ff9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -74,17 +74,17 @@ public class NLJoinExec extends BinaryPhysicalExec {
   public Tuple next() throws IOException {
     for (;;) {
       if (needNewOuter) {
-        outerTuple = outerChild.next();
+        outerTuple = leftChild.next();
         if (outerTuple == null) {
           return null;
         }
         needNewOuter = false;
       }
 
-      innerTuple = innerChild.next();
+      innerTuple = rightChild.next();
       if (innerTuple == null) {
         needNewOuter = true;
-        innerChild.rescan();
+        rightChild.rescan();
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
index 7a757ca..6af08a5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
@@ -42,7 +42,7 @@ public class UnionExec extends BinaryPhysicalExec {
   @Override
   public Tuple next() throws IOException {
     if (nextOuter) {
-      tuple = outerChild.next();
+      tuple = leftChild.next();
       if (tuple == null) {
        nextOuter = false; 
       } else {
@@ -50,7 +50,7 @@ public class UnionExec extends BinaryPhysicalExec {
       }
     }
     
-    return innerChild.next();
+    return rightChild.next();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 1ae4c1c..75d13ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -115,7 +115,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
       } else {
         joinNode.setJoinQual(qual);
       }
-      if (joinNode.getJoinType() == JoinType.CROSS_JOIN) {
+      if (joinNode.getJoinType() == JoinType.CROSS) {
         joinNode.setJoinType(JoinType.INNER);
       }
       cnf.removeAll(matched);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index c5b1c05..bea1eb6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -129,7 +129,7 @@ public class TestBNLJoinExec {
           "inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" };
 
   @Test
-  public final void testCrossJoin() throws IOException {
+  public final void testBNLCrossJoin() throws IOException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -137,7 +137,7 @@ public class TestBNLJoinExec {
 
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCrossJoin");
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         TUtil.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
@@ -148,8 +148,8 @@ public class TestBNLJoinExec {
 
     ProjectionExec proj = (ProjectionExec) exec;
     NLJoinExec nlJoin = (NLJoinExec) proj.getChild();
-    SeqScanExec scanOuter = (SeqScanExec) nlJoin.getOuterChild();
-    SeqScanExec scanInner = (SeqScanExec) nlJoin.getInnerChild();
+    SeqScanExec scanOuter = (SeqScanExec) nlJoin.getLeftChild();
+    SeqScanExec scanInner = (SeqScanExec) nlJoin.getRightChild();
 
     BNLJoinExec bnl = new BNLJoinExec(ctx, nlJoin.getPlan(), scanOuter, scanInner);
     proj.setChild(bnl);
@@ -164,7 +164,7 @@ public class TestBNLJoinExec {
   }
 
   @Test
-  public final void testInnerJoin() throws IOException {
+  public final void testBNLInnerJoin() throws IOException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -172,7 +172,7 @@ public class TestBNLJoinExec {
 
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
     TaskAttemptContext ctx =
         new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
             merged, workDir);
@@ -189,15 +189,15 @@ public class TestBNLJoinExec {
     JoinNode joinNode = null;
     if (proj.getChild() instanceof MergeJoinExec) {
       MergeJoinExec join = (MergeJoinExec) proj.getChild();
-      ExternalSortExec sortOut = (ExternalSortExec) join.getOuterChild();
-      ExternalSortExec sortIn = (ExternalSortExec) join.getInnerChild();
+      ExternalSortExec sortOut = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortIn = (ExternalSortExec) join.getRightChild();
       scanOuter = (SeqScanExec) sortOut.getChild();
       scanInner = (SeqScanExec) sortIn.getChild();
-      joinNode = join.getJoinNode();
+      joinNode = join.getPlan();
     } else if (proj.getChild() instanceof HashJoinExec) {
       HashJoinExec join = (HashJoinExec) proj.getChild();
-      scanOuter = (SeqScanExec) join.getOuterChild();
-      scanInner = (SeqScanExec) join.getInnerChild();
+      scanOuter = (SeqScanExec) join.getLeftChild();
+      scanInner = (SeqScanExec) join.getRightChild();
       joinNode = join.getPlan();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
new file mode 100644
index 0000000..a429cf2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashAntiJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private StorageManager sm;
+  private Path testDir;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManager.get(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerId", Type.INT4);
+    employeeSchema.addColumn("empId", Type.INT4);
+    employeeSchema.addColumn("memId", Type.INT4);
+    employeeSchema.addColumn("deptName", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
+        StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [0-9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    catalog.addTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empId", Type.INT4);
+    peopleSchema.addColumn("fk_memId", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    catalog.addTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+
+  // relation descriptions
+  // employee (managerid, empid, memid, deptname)
+  // people (empid, fk_memid, name, age)
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+  };
+
+  @Test
+  public final void testHashAntiJoin() throws IOException, PlanningException {
+    Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with an hash anti join.
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+      SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
+      SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
+      exec = new HashAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+    } else if (exec instanceof HashJoinExec) {
+      HashJoinExec join = (HashJoinExec) exec;
+
+      exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt(0).asInt4());
+      assertTrue(i == tuple.getInt(1).asInt4()); // expected empid [0, 2, 4, 6, 8]
+      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+      assertTrue(10 + i == tuple.getInt(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(5 , count); // the expected result : [0, 2, 4, 6, 8]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 591f431..9422358 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -129,7 +129,7 @@ public class TestHashJoinExec {
   };
 
   @Test
-  public final void testInnerJoin() throws IOException {
+  public final void testHashInnerJoin() throws IOException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -137,7 +137,7 @@ public class TestHashJoinExec {
 
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         TUtil.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
@@ -149,12 +149,12 @@ public class TestHashJoinExec {
     ProjectionExec proj = (ProjectionExec) exec;
     if (proj.getChild() instanceof MergeJoinExec) {
       MergeJoinExec join = (MergeJoinExec) proj.getChild();
-      ExternalSortExec sortout = (ExternalSortExec) join.getOuterChild();
-      ExternalSortExec sortin = (ExternalSortExec) join.getInnerChild();
+      ExternalSortExec sortout = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortin = (ExternalSortExec) join.getRightChild();
       SeqScanExec scanout = (SeqScanExec) sortout.getChild();
       SeqScanExec scanin = (SeqScanExec) sortin.getChild();
 
-      HashJoinExec hashjoin = new HashJoinExec(ctx, join.getJoinNode(), scanout, scanin);
+      HashJoinExec hashjoin = new HashJoinExec(ctx, join.getPlan(), scanout, scanin);
       proj.setChild(hashjoin);
 
       exec = proj;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index b85d1fa..9d85970 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -145,7 +145,7 @@ public class TestMergeJoinExec {
   };
 
   @Test
-  public final void testInnerJoin() throws IOException {
+  public final void testMergeInnerJoin() throws IOException {
     Fragment[] empFrags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = sm.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -153,7 +153,7 @@ public class TestMergeJoinExec {
 
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         TUtil.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
@@ -167,8 +167,8 @@ public class TestMergeJoinExec {
     // TODO - should be planed with user's optimization hint
     if (!(proj.getChild() instanceof MergeJoinExec)) {
       BinaryPhysicalExec nestedLoopJoin = (BinaryPhysicalExec) proj.getChild();
-      SeqScanExec outerScan = (SeqScanExec) nestedLoopJoin.getOuterChild();
-      SeqScanExec innerScan = (SeqScanExec) nestedLoopJoin.getInnerChild();
+      SeqScanExec outerScan = (SeqScanExec) nestedLoopJoin.getLeftChild();
+      SeqScanExec innerScan = (SeqScanExec) nestedLoopJoin.getRightChild();
 
       SeqScanExec tmp;
       if (!outerScan.getTableName().equals("employee")) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0f3965a0/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index c66b0ca..7235924 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -129,7 +129,7 @@ public class TestNLJoinExec {
   };
   
   @Test
-  public final void testCrossJoin() throws IOException {
+  public final void testNLCrossJoin() throws IOException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -137,7 +137,7 @@ public class TestNLJoinExec {
     
     Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
 
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCrossJoin");
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         TUtil.newQueryUnitAttemptId(), merged, workDir);
     Expr context = analyzer.parse(QUERIES[0]);
@@ -156,7 +156,7 @@ public class TestNLJoinExec {
   }
 
   @Test
-  public final void testInnerJoin() throws IOException {
+  public final void testNLInnerJoin() throws IOException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -164,7 +164,7 @@ public class TestNLJoinExec {
     
     Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
 
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         TUtil.newQueryUnitAttemptId(), merged, workDir);
     Expr context =  analyzer.parse(QUERIES[1]);