You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/07 10:26:47 UTC

[2/2] git commit: TAJO-232: Rename join operators and add other join operators to PhysicalPlanner. (hyunsik)

TAJO-232: Rename join operators and add other join operators to PhysicalPlanner. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d7645252
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d7645252
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d7645252

Branch: refs/heads/master
Commit: d7645252fb6bc3152d9774c6a655b8c083ff21b6
Parents: 1023c82
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Oct 7 17:02:29 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Oct 7 17:03:07 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/tajo/algebra/BinaryOperator.java |   8 +-
 .../apache/tajo/algebra/ExistsPredicate.java    |  44 +++
 .../java/org/apache/tajo/algebra/OpType.java    |   4 +-
 .../org/apache/tajo/algebra/RelationList.java   |   4 +-
 .../tajo/algebra/SimpleTableSubQuery.java       |  41 +++
 .../tajo/algebra/TablePrimarySubQuery.java      |  54 +++
 .../org/apache/tajo/algebra/TableSubQuery.java  |  54 ---
 .../org/apache/tajo/catalog/TestCatalog.java    |   2 +-
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   3 +-
 .../tajo/engine/parser/HiveConverter.java       |   2 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |  23 +-
 .../tajo/engine/planner/AlgebraVisitor.java     |   2 +-
 .../tajo/engine/planner/BaseAlgebraVisitor.java |   6 +-
 .../tajo/engine/planner/LogicalPlanner.java     |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 288 +++++++++++-----
 .../planner/logical/TableSubQueryNode.java      |   2 +-
 .../physical/BasicPhysicalExecutorVisitor.java  |  12 +-
 .../planner/physical/FullOuterHashJoinExec.java | 252 --------------
 .../physical/FullOuterMergeJoinExec.java        | 334 -------------------
 .../planner/physical/HashAntiJoinExec.java      | 105 ------
 .../planner/physical/HashFullOuterJoinExec.java | 252 ++++++++++++++
 .../planner/physical/HashLeftAntiJoinExec.java  | 110 ++++++
 .../planner/physical/HashLeftOuterJoinExec.java | 214 ++++++++++++
 .../planner/physical/HashLeftSemiJoinExec.java  | 107 ++++++
 .../planner/physical/HashSemiJoinExec.java      | 101 ------
 .../planner/physical/LeftOuterHashJoinExec.java | 214 ------------
 .../planner/physical/LeftOuterNLJoinExec.java   | 129 -------
 .../physical/MergeFullOuterJoinExec.java        | 334 +++++++++++++++++++
 .../planner/physical/NLLeftOuterJoinExec.java   | 129 +++++++
 .../physical/PhysicalExecutorVisitor.java       |   6 +-
 .../tajo/engine/parser/TestSQLAnalyzer.java     |  24 ++
 .../physical/TestFullOuterHashJoinExec.java     |   9 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  14 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   8 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   8 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  11 +-
 .../physical/TestLeftOuterNLJoinExec.java       |  40 +--
 .../physical/TestRightOuterHashJoinExec.java    |   2 +-
 .../src/test/queries/exists_predicate_1.sql     |   1 +
 .../src/test/queries/exists_predicate_2.sql     |   1 +
 .../src/test/queries/in_subquery_1.sql          |   1 +
 .../src/test/queries/in_subquery_2.sql          |   1 +
 43 files changed, 1600 insertions(+), 1361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f990530..14f935a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-232: Rename join operators and add other join operators to
+    PhysicalPlanner. (hyunsik)
+
     TAJO-229: Implement JoinGraph to represent a graph of relation joins.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
index 3e00e5e..a090623 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/BinaryOperator.java
@@ -32,16 +32,16 @@ public class BinaryOperator extends Expr {
     this.right = right;
   }
 
-  public Expr getLeft() {
-    return this.left;
+  public <T extends Expr> T getLeft() {
+    return (T) this.left;
   }
 
   public void setLeft(Expr left) {
     this.left = left;
   }
 
-  public Expr getRight() {
-    return this.right;
+  public <T extends Expr> T getRight() {
+    return (T) this.right;
   }
 
   public void setRight(Expr right) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
new file mode 100644
index 0000000..0823807
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/ExistsPredicate.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class ExistsPredicate extends Expr {
+  private SimpleTableSubQuery simpleTableSubQuery;
+  private boolean not;
+
+  public ExistsPredicate(SimpleTableSubQuery simpleTableSubQuery, boolean not) {
+    super(OpType.InPredicate);
+    this.simpleTableSubQuery = simpleTableSubQuery;
+    this.not = not;
+  }
+
+  public boolean isNot() {
+    return this.not;
+  }
+
+  public SimpleTableSubQuery getSubQuery() {
+    return simpleTableSubQuery;
+  }
+
+  @Override
+  boolean equalsTo(Expr expr) {
+    ExistsPredicate another = (ExistsPredicate) expr;
+    return not == another.not && simpleTableSubQuery.equals(another.simpleTableSubQuery);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
index 145c52c..799b196 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
@@ -28,7 +28,8 @@ public enum OpType {
   Relation(Relation.class),
   RelationList(RelationList.class),
   Rename,
-  TableSubQuery(TableSubQuery.class),
+  SimpleTableSubQuery(SimpleTableSubQuery.class),
+  TablePrimaryTableSubQuery(TablePrimarySubQuery.class),
   Except(SetOperation.class),
   Having(Having.class),
 	Aggregation(Aggregation.class),
@@ -66,6 +67,7 @@ public enum OpType {
   InPredicate(InPredicate.class),
   ValueList(ValueListExpr.class),
   Is,
+  ExistsPredicate(ExistsPredicate.class),
 
   // string operator or pattern matching predicates
   LikePredicate(PatternMatchPredicate.class),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
index 8e9f878..918ff46 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/RelationList.java
@@ -33,8 +33,8 @@ public class RelationList extends Expr {
       Preconditions.checkArgument(
           rel.getType() == OpType.Relation ||
           rel.getType() == OpType.Join ||
-          rel.getType() == OpType.TableSubQuery,
-          "Only Relation, Join, or TableSubQuery can be given to RelationList, but this expr "
+          rel.getType() == OpType.TablePrimaryTableSubQuery,
+          "Only Relation, Join, or TablePrimarySubQuery can be given to RelationList, but this expr "
               + " is " + rel.getType());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
new file mode 100644
index 0000000..91442dc
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/SimpleTableSubQuery.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class SimpleTableSubQuery extends Expr {
+  private Expr subquery;
+
+  public SimpleTableSubQuery(Expr subquery) {
+    super(OpType.SimpleTableSubQuery);
+    this.subquery = subquery;
+  }
+
+  public Expr getSubQuery() {
+    return subquery;
+  }
+
+  @Override
+  boolean equalsTo(Expr expr) {
+    return subquery.equals(subquery);
+  }
+
+  public String toJson() {
+    return JsonHelper.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
new file mode 100644
index 0000000..929e9b2
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TablePrimarySubQuery.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+public class TablePrimarySubQuery extends Relation {
+  private Expr subquery;
+  private String [] columnNames;
+
+  public TablePrimarySubQuery(String relName, Expr subquery) {
+    super(OpType.TablePrimaryTableSubQuery, relName);
+    this.subquery = subquery;
+  }
+
+  public boolean hasColumnNames() {
+    return this.columnNames != null;
+  }
+
+  public void setColumnNames(String[] aliasList) {
+    this.columnNames = aliasList;
+  }
+
+  public String [] getColumnNames() {
+    return columnNames;
+  }
+
+  public Expr getSubQuery() {
+    return subquery;
+  }
+
+  @Override
+  boolean equalsTo(Expr expr) {
+    return subquery.equals(subquery);
+  }
+
+  public String toJson() {
+    return JsonHelper.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
deleted file mode 100644
index 55c44ba..0000000
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/TableSubQuery.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.algebra;
-
-public class TableSubQuery extends Relation {
-  private Expr subquery;
-  private String [] columnNames;
-
-  public TableSubQuery(String relName, Expr subquery) {
-    super(OpType.TableSubQuery, relName);
-    this.subquery = subquery;
-  }
-
-  public boolean hasColumnNames() {
-    return this.columnNames != null;
-  }
-
-  public void setColumnNames(String[] aliasList) {
-    this.columnNames = aliasList;
-  }
-
-  public String [] getColumnNames() {
-    return columnNames;
-  }
-
-  public Expr getSubQuery() {
-    return subquery;
-  }
-
-  @Override
-  boolean equalsTo(Expr expr) {
-    return subquery.equals(subquery);
-  }
-
-  public String toJson() {
-    return JsonHelper.toJson(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index fd1893b..97824b2 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -48,7 +48,7 @@ public class TestCatalog {
 	public static void setUp() throws Exception {
     TajoConf conf = new TajoConf();
 
-    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/TestCatalog/db");
+    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/TestCatalog/db;create=true");
     conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, "127.0.0.1:0");
 
 	  server = new CatalogServer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 6fc8166..1c6816a 100644
--- a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -831,6 +831,7 @@ predicate
   | in_predicate
   | pattern_matching_predicate
   | null_predicate
+  | exists_predicate
   ;
 
 /*
@@ -944,7 +945,7 @@ some : SOME | ANY;
 */
 
 exists_predicate
-  : EXISTS s=table_subquery
+  : NOT? EXISTS s=table_subquery
   ;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
index 0442e77..135d146 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
@@ -501,7 +501,7 @@ public class HiveConverter extends HiveParserBaseVisitor<Expr>{
                 }
             }
 
-            TableSubQuery subQuery = new TableSubQuery(tableAlias, current);
+            TablePrimarySubQuery subQuery = new TablePrimarySubQuery(tableAlias, current);
             current = subQuery;
         }
         // TODO: implement lateralView

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 021e0ba..49494a9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -400,7 +400,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       }
       return relation;
     } else if (ctx.derived_table() != null) {
-      return new TableSubQuery(ctx.name.getText(), visit(ctx.derived_table().table_subquery()));
+      return new TablePrimarySubQuery(ctx.name.getText(), visit(ctx.derived_table().table_subquery()));
     } else {
       return null;
     }
@@ -628,13 +628,17 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   }
 
   @Override
-  public ValueListExpr visitIn_predicate_value(SQLParser.In_predicate_valueContext ctx) {
-    int size = ctx.in_value_list().row_value_expression().size();
-    Expr [] exprs = new Expr[size];
-    for (int i = 0; i < size; i++) {
-      exprs[i] = visitRow_value_expression(ctx.in_value_list().row_value_expression(i));
+  public Expr visitIn_predicate_value(SQLParser.In_predicate_valueContext ctx) {
+    if (checkIfExist(ctx.in_value_list())) {
+      int size = ctx.in_value_list().row_value_expression().size();
+      Expr [] exprs = new Expr[size];
+      for (int i = 0; i < size; i++) {
+        exprs[i] = visitRow_value_expression(ctx.in_value_list().row_value_expression(i));
+      }
+      return new ValueListExpr(exprs);
+    } else {
+      return new SimpleTableSubQuery(visitChildren(ctx.table_subquery()));
     }
-    return new ValueListExpr(exprs);
   }
 
   @Override
@@ -692,6 +696,11 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   }
 
   @Override
+  public ExistsPredicate visitExists_predicate(SQLParser.Exists_predicateContext ctx) {
+    return new ExistsPredicate(new SimpleTableSubQuery(visitTable_subquery(ctx.table_subquery())), ctx.NOT() != null);
+  }
+
+  @Override
   public ColumnReferenceExpr visitColumn_reference(SQLParser.Column_referenceContext ctx) {
     ColumnReferenceExpr column = new ColumnReferenceExpr(ctx.name.getText());
     if (ctx.tb_name != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
index 633609a..345320c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
@@ -34,7 +34,7 @@ public interface AlgebraVisitor<CONTEXT, RESULT> {
   RESULT visitExcept(CONTEXT ctx, Stack<OpType> stack, SetOperation expr) throws PlanningException;
   RESULT visitIntersect(CONTEXT ctx, Stack<OpType> stack, SetOperation expr) throws PlanningException;
   RESULT visitRelationList(CONTEXT ctx, Stack<OpType> stack, RelationList expr) throws PlanningException;
-  RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TableSubQuery expr) throws PlanningException;
+  RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TablePrimarySubQuery expr) throws PlanningException;
   RESULT visitRelation(CONTEXT ctx, Stack<OpType> stack, Relation expr) throws PlanningException;
   RESULT visitCreateTable(CONTEXT ctx, Stack<OpType> stack, CreateTable expr) throws PlanningException;
   RESULT visitDropTable(CONTEXT ctx, Stack<OpType> stack, DropTable expr) throws PlanningException;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index 06ab38c..f974e1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -82,8 +82,8 @@ public abstract class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisi
       case RelationList:
         current = visitRelationList(ctx, stack, (RelationList) expr);
         break;
-      case TableSubQuery:
-        current = visitTableSubQuery(ctx, stack, (TableSubQuery) expr);
+      case TablePrimaryTableSubQuery:
+        current = visitTableSubQuery(ctx, stack, (TablePrimarySubQuery) expr);
         break;
       case Relation:
         current = visitRelation(ctx, stack, (Relation) expr);
@@ -209,7 +209,7 @@ public abstract class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisi
   }
 
   @Override
-  public RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TableSubQuery expr) throws PlanningException {
+  public RESULT visitTableSubQuery(CONTEXT ctx, Stack<OpType> stack, TablePrimarySubQuery expr) throws PlanningException {
     stack.push(expr.getType());
     RESULT child = visitChild(ctx, stack, expr.getSubQuery());
     stack.pop();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 359dda3..d083ac1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -145,7 +145,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     }
   }
 
-  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr)
+  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TablePrimarySubQuery expr)
       throws PlanningException {
     QueryBlock newBlock = context.plan.newAndGetBlock(expr.getName());
     PlanContext newContext = new PlanContext(context.plan, newBlock);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c6e4695..14514a9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -211,34 +211,150 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
 
       case LEFT_SEMI:
+        return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
+
       case RIGHT_SEMI:
+        return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
 
       case LEFT_ANTI:
+        return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
+
       case RIGHT_ANTI:
+        return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
 
       default:
         throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
     }
   }
 
-  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
                                            PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        default:
+          // fallback algorithm
+          LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+      }
+
+    } else {
+      return new BNLJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+          return new HashJoinExec(context, plan, leftExec, rightExec);
+        case MERGE_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
+          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+        case HYBRID_HASH_JOIN:
+
+        default:
+          LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+      }
+
+
+    } else {
+      return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long leftSize = estimateSizeRecursive(context, leftLineage);
+    long rightSize = estimateSizeRecursive(context, rightLineage);
+
+    final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
+
+    boolean hashJoin = false;
+    if (leftSize < threshold || rightSize < threshold) {
+      hashJoin = true;
+    }
+
+    if (hashJoin) {
+      PhysicalExec selectedOuter;
+      PhysicalExec selectedInner;
+
+      // HashJoinExec loads the inner relation to memory.
+      if (leftSize <= rightSize) {
+        selectedInner = leftExec;
+        selectedOuter = rightExec;
+      } else {
+        selectedInner = rightExec;
+        selectedOuter = leftExec;
+      }
+
+      LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
+      return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+    } else {
+      return createMergeInnerJoin(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
+                                             PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+    ExternalSortExec outerSort = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
+        leftExec);
+    ExternalSortExec innerSort = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
+        rightExec);
+
+    LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
+    return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+  }
+
+  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
           LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
         case NESTED_LOOP_JOIN:
           //the right operand is too large, so we opt for NL implementation of left outer join
           LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-          return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+          return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
         default:
           LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
       }
     } else {
       return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
@@ -253,12 +369,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     if (rightTableVolume < threshold) {
       // we can implement left outer join using hash join, using the right operand as the build relation
       LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-      return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+      return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
     }
     else {
       //the right operand is too large, so we opt for NL implementation of left outer join
       LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-      return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+      return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
     }
   }
 
@@ -270,7 +386,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     long outerSize4 = estimateSizeRecursive(context, outerLineage4);
     if (outerSize4 < threshold){
       LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-      return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+      return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
     } else {
       return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
     }
@@ -290,7 +406,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   }
 
   private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
     if (property != null) {
@@ -298,7 +414,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
           LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-          return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
         case MERGE_JOIN:
           return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
         default:
@@ -312,7 +428,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   }
 
   private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
     if (property != null) {
@@ -334,7 +450,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  private FullOuterHashJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                             PhysicalExec leftExec, PhysicalExec rightExec)
       throws IOException {
     String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
@@ -354,10 +470,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       selectedRight = leftExec;
     }
     LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
-    return new FullOuterHashJoinExec(context, plan, selectedRight, selectedLeft);
+    return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
   }
 
-  private FullOuterMergeJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                               PhysicalExec leftExec, PhysicalExec rightExec)
       throws IOException {
     // if size too large, full outer merge join implementation
@@ -369,7 +485,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     ExternalSortExec innerSort3 = new ExternalSortExec(context, sm,
         new SortNode(UNGENERATED_PID,sortSpecs3[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
 
-    return new FullOuterMergeJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
+    return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
   }
 
   private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
@@ -386,114 +502,104 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+  /**
+   *  Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
+   */
+  private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
-
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
-
       switch (algorithm) {
-        case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
-        case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+
         default:
-          // fallback algorithm
-          LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
       }
-
     } else {
-      return new BNLJoinExec(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
-
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
-
       switch (algorithm) {
-        case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
-        case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
-          return new HashJoinExec(context, plan, leftExec, rightExec);
-        case MERGE_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
-          return createMergeJoin(context, plan, leftExec, rightExec);
-        case HYBRID_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
 
         default:
-          LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
-          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
-          return createMergeJoin(context, plan, leftExec, rightExec);
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
       }
-
-
     } else {
-      return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
     }
   }
 
-  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
-                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
-    long leftSize = estimateSizeRecursive(context, leftLineage);
-    long rightSize = estimateSizeRecursive(context, rightLineage);
-
-    final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
-
-    boolean hashJoin = false;
-    if (leftSize < threshold || rightSize < threshold) {
-      hashJoin = true;
-    }
-
-    if (hashJoin) {
-      PhysicalExec selectedOuter;
-      PhysicalExec selectedInner;
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
 
-      // HashJoinExec loads the inner relation to memory.
-      if (leftSize <= rightSize) {
-        selectedInner = leftExec;
-        selectedOuter = rightExec;
-      } else {
-        selectedInner = rightExec;
-        selectedOuter = leftExec;
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
       }
-
-      LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
-      return new HashJoinExec(context, plan, selectedOuter, selectedInner);
     } else {
-      return createMergeJoin(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
     }
   }
 
-  private MergeJoinExec createMergeJoin(TaskAttemptContext context, JoinNode plan,
-                                        PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
-        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
-    ExternalSortExec outerSort = new ExternalSortExec(context, sm,
-        new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
-        leftExec);
-    ExternalSortExec innerSort = new ExternalSortExec(context, sm,
-        new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
-        rightExec);
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
 
-    LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
-    return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+      }
+    } else {
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+    }
   }
 
   public PhysicalExec createStorePlan(TaskAttemptContext ctx,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index 39ae1e2..846ed41 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -79,7 +79,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
 
   @Override
   public PlanString getPlanString() {
-    PlanString planStr = new PlanString("TableSubQuery");
+    PlanString planStr = new PlanString("TablePrimarySubQuery");
     planStr.appendTitle(" as ").appendTitle(tableName);
     return planStr;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index fc569d7..4723ecc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -38,10 +38,10 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
       return visitProjection((ProjectionExec) exec, stack, context);
     } else if (exec instanceof HashJoinExec) {
       return visitHashJoin((HashJoinExec) exec, stack, context);
-    } else if (exec instanceof HashAntiJoinExec) {
-      return visitHashAntiJoin((HashAntiJoinExec) exec, stack, context);
-    } else if (exec instanceof HashSemiJoinExec) {
-      return visitHashSemiJoin((HashSemiJoinExec) exec, stack, context);
+    } else if (exec instanceof HashLeftAntiJoinExec) {
+      return visitHashAntiJoin((HashLeftAntiJoinExec) exec, stack, context);
+    } else if (exec instanceof HashLeftSemiJoinExec) {
+      return visitHashSemiJoin((HashLeftSemiJoinExec) exec, stack, context);
     } else if (exec instanceof LimitExec) {
       return visitLimit((LimitExec) exec, stack, context);
     } else {
@@ -108,13 +108,13 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
   }
 
   @Override
-  public RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashSemiJoin(HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
       throws PhysicalPlanningException {
     return visitBinaryExecutor(exec, stack, context);
   }
 
   @Override
-  public RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+  public RESULT visitHashAntiJoin(HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
       throws PhysicalPlanningException {
     return visitBinaryExecutor(exec, stack, context);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
deleted file mode 100644
index aebfdb5..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-import java.util.*;
-
-
-public class FullOuterHashJoinExec extends BinaryPhysicalExec {
-  // from logical plan
-  protected JoinNode plan;
-  protected EvalNode joinQual;
-
-  protected List<Column[]> joinKeyPairs;
-
-  // temporal tuples and states for nested loop join
-  protected boolean first = true;
-  protected FrameTuple frameTuple;
-  protected Tuple outTuple = null;
-  protected Map<Tuple, List<Tuple>> tupleSlots;
-  protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
-  protected Tuple leftTuple;
-  protected Tuple leftKeyTuple;
-
-  protected int [] leftKeyList;
-  protected int [] rightKeyList;
-
-  protected boolean finished = false;
-  protected boolean shouldGetLeftTuple = true;
-
-  // projection
-  protected final Projector projector;
-  protected final EvalContext [] evalContexts;
-
-  private int rightNumCols;
-  private int leftNumCols;
-  private Map<Tuple, Boolean> matched;
-
-  public FullOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
-                               PhysicalExec inner) {
-    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
-        plan.getOutSchema(), outer, inner);
-    this.plan = plan;
-    this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
-    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
-    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
-    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
-    this.matched = new HashMap<Tuple, Boolean>(10000);
-
-    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
-        outer.getSchema(), inner.getSchema());
-
-    leftKeyList = new int[joinKeyPairs.size()];
-    rightKeyList = new int[joinKeyPairs.size()];
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
-    }
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
-    }
-
-    // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.getColumnNum());
-    leftKeyTuple = new VTuple(leftKeyList.length);
-
-    leftNumCols = outer.getSchema().getColumnNum();
-    rightNumCols = inner.getSchema().getColumnNum();
-  }
-
-  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
-    for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
-    }
-  }
-
-  public Tuple getNextUnmatchedRight() {
-
-    List<Tuple> newValue;
-    Tuple returnedTuple;
-    // get a keyTUple from the matched hashmap with a boolean false value
-    for(Tuple aKeyTuple : matched.keySet()) {
-      if(matched.get(aKeyTuple) == false) {
-        newValue = tupleSlots.get(aKeyTuple);
-        returnedTuple = newValue.remove(0);
-        tupleSlots.put(aKeyTuple, newValue);
-
-        // after taking the last element from the list in tupleSlots, set flag true in matched as well
-        if(newValue.isEmpty()){
-          matched.put(aKeyTuple, true);
-        }
-
-        return returnedTuple;
-      }
-    }
-    return null;
-  }
-
-  public Tuple next() throws IOException {
-    if (first) {
-      loadRightToHashTable();
-    }
-
-    Tuple rightTuple;
-    boolean found = false;
-
-    while(!finished) {
-      if (shouldGetLeftTuple) { // initially, it is true.
-        // getting new outer
-        leftTuple = leftChild.next(); // it comes from a disk
-        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
-          Tuple unmatchedRightTuple = getNextUnmatchedRight();
-          if( unmatchedRightTuple == null) {
-            finished = true;
-            outTuple = null;
-            return null;
-          } else {
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-
-            return outTuple;
-          }
-        }
-
-        // getting corresponding right
-        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
-        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
-          iterator = tupleSlots.get(leftKeyTuple).iterator();
-          shouldGetLeftTuple = false;
-        } else {
-          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
-          //output a tuple with the nulls padded rightTuple
-          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-          frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
-          // we simulate we found a match, which is exactly the null padded one
-          shouldGetLeftTuple = true;
-          return outTuple;
-        }
-      }
-
-      // getting a next right tuple on in-memory hash table.
-      rightTuple = iterator.next();
-      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
-      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        found = true;
-        getKeyLeftTuple(leftTuple, leftKeyTuple);
-        matched.put(leftKeyTuple, true);
-      }
-
-      if (!iterator.hasNext()) { // no more right tuples for this hash key
-        shouldGetLeftTuple = true;
-      }
-
-      if (found) {
-        break;
-      }
-    }
-    return outTuple;
-  }
-
-  protected void loadRightToHashTable() throws IOException {
-    Tuple tuple;
-    Tuple keyTuple;
-
-    while ((tuple = rightChild.next()) != null) {
-      keyTuple = new VTuple(joinKeyPairs.size());
-      List<Tuple> newValue;
-      for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(rightKeyList[i]));
-      }
-
-      if (tupleSlots.containsKey(keyTuple)) {
-        newValue = tupleSlots.get(keyTuple);
-        newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
-      } else {
-        newValue = new ArrayList<Tuple>();
-        newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
-        matched.put(keyTuple,false);
-      }
-    }
-    first = false;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-
-    tupleSlots.clear();
-    first = true;
-
-    finished = false;
-    iterator = null;
-    shouldGetLeftTuple = true;
-  }
-
-  public void close() throws IOException {
-    tupleSlots.clear();
-  }
-
-  public JoinNode getPlan() {
-    return this.plan;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
deleted file mode 100644
index 2c11fea..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FullOuterMergeJoinExec extends BinaryPhysicalExec {
-  // from logical plan
-  private JoinNode joinNode;
-  private EvalNode joinQual;
-  private EvalContext qualCtx;
-
-  // temporal tuples and states for nested loop join
-  private FrameTuple frameTuple;
-  private Tuple leftTuple = null;
-  private Tuple rightTuple = null;
-  private Tuple outTuple = null;
-  private Tuple leftNext = null;
-
-  private final List<Tuple> leftTupleSlots;
-  private final List<Tuple> rightTupleSlots;
-
-  private JoinTupleComparator joincomparator = null;
-  private TupleComparator[] tupleComparator = null;
-
-  private final static int INITIAL_TUPLE_SLOT = 10000;
-
-  private boolean end = false;
-
-  // projection
-  private final Projector projector;
-  private final EvalContext [] evalContexts;
-
-  private int rightNumCols;
-  private int leftNumCols;
-  private int posRightTupleSlots = -1;
-  private int posLeftTupleSlots = -1;
-  boolean endInPopulationStage = false;
-  private boolean initRightDone = false;
-
-  public FullOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
-                                PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
-    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
-    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
-        "but there is no join condition");
-    this.joinNode = plan;
-    this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
-
-    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
-    this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
-    SortSpec[][] sortSpecs = new SortSpec[2][];
-    sortSpecs[0] = leftSortKey;
-    sortSpecs[1] = rightSortKey;
-
-    this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
-        rightChild.getSchema(), sortSpecs);
-    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
-        plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
-
-    // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.renew();
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.getColumnNum());
-
-    leftNumCols = leftChild.getSchema().getColumnNum();
-    rightNumCols = rightChild.getSchema().getColumnNum();
-  }
-
-  public JoinNode getPlan(){
-    return this.joinNode;
-  }
-
-  public Tuple next() throws IOException {
-    Tuple previous;
-
-    for (;;) {
-      boolean newRound = false;
-      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
-        newRound = true;
-      }
-      if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
-        newRound = true;
-      }
-
-      if(newRound == true){
-
-        if (end) {
-
-          ////////////////////////////////////////////////////////////////////////
-          // FINALIZING STAGE
-          ////////////////////////////////////////////////////////////////////////
-          // the finalizing stage, where remaining tuples on the right are
-          // transformed into left-padded results while tuples on the left
-          // are transformed into right-padded results
-
-          // before exit, a left-padded tuple should be built for all remaining
-          // right side and a right-padded tuple should be built for all remaining
-          // left side
-
-          if (initRightDone == false) {
-            // maybe the left operand was empty => the right one didn't have the chance to initialize
-            rightTuple = rightChild.next();
-            initRightDone = true;
-          }
-
-          if((leftTuple == null) && (rightTuple == null)) {
-            return null;
-          }
-
-          if((leftTuple == null) && (rightTuple != null)){
-            // output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // we simulate we found a match, which is exactly the null padded one
-            rightTuple = rightChild.next();
-            return outTuple;
-          }
-
-          if((leftTuple != null) && (rightTuple == null)){
-            // output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-            frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // we simulate we found a match, which is exactly the null padded one
-            leftTuple = leftChild.next();
-            return outTuple;
-          }
-        } // if end
-
-        ////////////////////////////////////////////////////////////////////////
-        // INITIALIZING STAGE
-        ////////////////////////////////////////////////////////////////////////
-        // initializing stage, reading the first tuple on each side
-        if (leftTuple == null) {
-          leftTuple = leftChild.next();
-          if( leftTuple == null){
-            end = true;
-            continue;
-          }
-        }
-        if (rightTuple == null) {
-          rightTuple = rightChild.next();
-          initRightDone = true;
-          if (rightTuple == null) {
-            end = true;
-            continue;
-          }
-        }
-
-        // reset tuple slots for a new round
-        leftTupleSlots.clear();
-        rightTupleSlots.clear();
-        posRightTupleSlots = -1;
-        posLeftTupleSlots = -1;
-
-        ////////////////////////////////////////////////////////////////////////
-        // Comparison and Move Forward Stage
-        ////////////////////////////////////////////////////////////////////////
-        // advance alternatively on each side until a match is found
-        int cmp;
-        while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
-
-          if (cmp > 0) {
-
-            //before getting a new tuple from the right,  a leftnullpadded tuple should be built
-            //output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // BEFORE RETURN, MOVE FORWARD
-            rightTuple = rightChild.next();
-            if(rightTuple == null) {
-              end = true;
-            }
-
-            return outTuple;
-
-          } else if (cmp < 0) {
-            // before getting a new tuple from the left,  a rightnullpadded tuple should be built
-            // output a tuple with the nulls padded rightTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-            frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            // we simulate we found a match, which is exactly the null padded one
-            // BEFORE RETURN, MOVE FORWARD
-            leftTuple = leftChild.next();
-            if(leftTuple == null) {
-              end = true;
-            }
-
-            return outTuple;
-
-          } // if (cmp < 0)
-        } //while
-
-
-        ////////////////////////////////////////////////////////////////////////
-        // SLOTS POPULATION STAGE
-        ////////////////////////////////////////////////////////////////////////
-        // once a match is found, retain all tuples with this key in tuple slots
-        // on each side
-        if(!end) {
-          endInPopulationStage = false;
-
-          boolean endLeft = false;
-          boolean endRight = false;
-
-          previous = new VTuple(leftTuple);
-          do {
-            leftTupleSlots.add(new VTuple(leftTuple));
-            leftTuple = leftChild.next();
-            if(leftTuple == null) {
-              endLeft = true;
-            }
-
-
-          } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
-          posLeftTupleSlots = 0;
-
-
-          previous = new VTuple(rightTuple);
-          do {
-            rightTupleSlots.add(new VTuple(rightTuple));
-            rightTuple = rightChild.next();
-            if(rightTuple == null) {
-              endRight = true;
-            }
-
-          } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
-          posRightTupleSlots = 0;
-
-          if ((endLeft == true) || (endRight == true)) {
-            end = true;
-            endInPopulationStage = true;
-          }
-
-        } // if end false
-      } // if newRound
-
-
-      ////////////////////////////////////////////////////////////////////////
-      // RESULTS STAGE
-      ////////////////////////////////////////////////////////////////////////
-      // now output result matching tuples from the slots
-      // if either we haven't reached end on neither side, or we did reach end
-      // on one(or both) sides but that happened in the slots population step
-      // (i.e. refers to next round)
-      if(!end || (end && endInPopulationStage)){
-        if(posLeftTupleSlots == 0){
-          leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
-          posLeftTupleSlots = posLeftTupleSlots + 1;
-        }
-
-        if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
-          Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
-          posRightTupleSlots = posRightTupleSlots + 1;
-          frameTuple.set(leftNext, aTuple);
-          joinQual.eval(qualCtx, inSchema, frameTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
-          return outTuple;
-        } else {
-          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
-          if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
-            //rewind the right slots position
-            posRightTupleSlots = 0;
-            Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
-            posRightTupleSlots = posRightTupleSlots + 1;
-            leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
-            posLeftTupleSlots = posLeftTupleSlots + 1;
-
-            frameTuple.set(leftNext, aTuple);
-            joinQual.eval(qualCtx, inSchema, frameTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
-            return outTuple;
-          }
-        }
-      } // the second if end false
-    } // for
-  }
-
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-    leftTupleSlots.clear();
-    rightTupleSlots.clear();
-    posRightTupleSlots = -1;
-    posLeftTupleSlots = -1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
deleted file mode 100644
index 8f2d115..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAntiJoinExec.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.TaskAttemptContext;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-
-import java.io.IOException;
-
-public class HashAntiJoinExec extends HashJoinExec {
-  private Tuple rightNullTuple;
-
-  public HashAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
-                      PhysicalExec right) {
-    super(context, plan, left, right);
-    // NUll Tuple
-    rightNullTuple = new VTuple(leftChild.outColumnNum);
-    for (int i = 0; i < leftChild.outColumnNum; i++) {
-      rightNullTuple.put(i, NullDatum.get());
-    }
-  }
-
-  /**
-   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
-   * next() method finds the first unmatched tuple from both tables.
-   *
-   * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
-   * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
-   * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
-   *
-   * @return The tuple which is unmatched to a given join condition.
-   * @throws IOException
-   */
-  public Tuple next() throws IOException {
-    if (first) {
-      loadRightToHashTable();
-    }
-
-    Tuple rightTuple;
-    boolean notFound;
-
-    while(!finished) {
-
-      // getting new outer
-      leftTuple = leftChild.next(); // it comes from a disk
-      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-        finished = true;
-        return null;
-      }
-
-      // Try to find a hash bucket in in-memory hash table
-      getKeyLeftTuple(leftTuple, leftKeyTuple);
-      if (tupleSlots.containsKey(leftKeyTuple)) {
-        // if found, it gets a hash bucket from the hash table.
-        iterator = tupleSlots.get(leftKeyTuple).iterator();
-      } else {
-        // if not found, it returns a tuple.
-        frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        return outTuple;
-      }
-
-      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
-      // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
-      notFound = true;
-      while (notFound && iterator.hasNext()) {
-        rightTuple = iterator.next();
-        frameTuple.set(leftTuple, rightTuple);
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
-          notFound = false;
-        }
-      }
-
-      if (notFound) { // if there is no matched tuple
-        frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
-        break;
-      }
-    }
-
-    return outTuple;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d7645252/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
new file mode 100644
index 0000000..ac591d3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class HashFullOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private Map<Tuple, Boolean> matched;
+
+  public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+                               PhysicalExec inner) {
+    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+        plan.getOutSchema(), outer, inner);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = joinQual.newContext();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+    this.matched = new HashMap<Tuple, Boolean>(10000);
+
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
+        outer.getSchema(), inner.getSchema());
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = outer.getSchema().getColumnNum();
+    rightNumCols = inner.getSchema().getColumnNum();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple getNextUnmatchedRight() {
+
+    List<Tuple> newValue;
+    Tuple returnedTuple;
+    // get a keyTUple from the matched hashmap with a boolean false value
+    for(Tuple aKeyTuple : matched.keySet()) {
+      if(matched.get(aKeyTuple) == false) {
+        newValue = tupleSlots.get(aKeyTuple);
+        returnedTuple = newValue.remove(0);
+        tupleSlots.put(aKeyTuple, newValue);
+
+        // after taking the last element from the list in tupleSlots, set flag true in matched as well
+        if(newValue.isEmpty()){
+          matched.put(aKeyTuple, true);
+        }
+
+        return returnedTuple;
+      }
+    }
+    return null;
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+          Tuple unmatchedRightTuple = getNextUnmatchedRight();
+          if( unmatchedRightTuple == null) {
+            finished = true;
+            outTuple = null;
+            return null;
+          } else {
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+
+            return outTuple;
+          }
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        found = true;
+        getKeyLeftTuple(leftTuple, leftKeyTuple);
+        matched.put(leftKeyTuple, true);
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      List<Tuple> newValue;
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      if (tupleSlots.containsKey(keyTuple)) {
+        newValue = tupleSlots.get(keyTuple);
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+        matched.put(keyTuple,false);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  public void close() throws IOException {
+    tupleSlots.clear();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+