You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/02 03:33:18 UTC

[1/3] TAJO-216: Improve FilterPushDownRule and Implement physical operators for outer join. (camelia_c via hyunsik)

Updated Branches:
  refs/heads/master 406337d9d -> 3e6d684a9


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
new file mode 100644
index 0000000..a946934
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestLeftOuterNLJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestLeftOuterNLJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+        StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+                    DatumFactory.createText("dept_" + i),
+                    DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    catalog.addTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+        StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+                    DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    catalog.addTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    catalog.addTable(emp3);
+
+    // ---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+        StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    appender5.init();
+    
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    catalog.addTable(phone3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+  
+  String[] QUERIES = {
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+      "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id", //1 nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id", //2 nulls on the left side
+      "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id", //3 one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id" //4 one operand is empty
+  };
+
+  @Test
+  public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
+    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr context =  analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+      LeftOuterHashJoinExec join = proj.getChild();
+      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+    }
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+       //TODO check contents
+         count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr context =  analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(5, count);
+  }
+
+  @Test
+  public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr context =  analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr context =  analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+
+    @Test
+  public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr context =  analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+      LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+      LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(0, count);
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index ee5ab83..53df9f7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -179,7 +179,7 @@ public class TestMergeJoinExec {
       assertTrue(i == tuple.getInt(0).asInt4());
       assertTrue(i == tuple.getInt(1).asInt4());
       assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
-      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      assertTrue((10 + i) == tuple.getInt(3).asInt4());
 
       i += 2;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
new file mode 100644
index 0000000..d237f0c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+// this is not a physical operator in itself, but it uses the LeftOuterHashJoinExec with switched inputs order
+public class TestRightOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestRightOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+        StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+                    DatumFactory.createText("dept_" + i),
+                    DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    catalog.addTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+        StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+                    DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    catalog.addTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    catalog.addTable(emp3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+      "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id", //1 nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id" //2 nulls on the right side
+  };
+
+  @Test
+  public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+       //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(12, count);
+    }
+  }
+
+
+  @Test
+  public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+       //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(5, count);
+    }
+  }
+
+    @Test
+  public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
+    
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(7, count);
+    }
+  }
+
+
+}
+ //--camelia

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
new file mode 100644
index 0000000..a37d409
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestRightOuterMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestRightOuterMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private static final int UNGENERATED_PID = -1;
+
+  private TableDesc dep3;
+  private TableDesc dep4;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+        StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    catalog.addTable(dep3);
+
+
+    //----------------- dep4 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    // 10     | dep_10    | 1010
+    Schema dep4Schema = new Schema();
+    dep4Schema.addColumn("dep_id", Type.INT4);
+    dep4Schema.addColumn("dep_name", Type.TEXT);
+    dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep4Meta = CatalogUtil.newTableMeta(dep4Schema,
+        StoreType.CSV);
+    Path dep4Path = new Path(testDir, "dep4.csv");
+    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Path);
+    appender4.init();
+    Tuple tuple4 = new VTuple(dep4Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 11; i++) {
+      tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender4.addTuple(tuple4);
+    }
+
+    appender4.flush();
+    appender4.close();
+    dep4 = CatalogUtil.newTableDesc("dep4", dep4Meta, dep4Path);
+    catalog.addTable(dep4);
+
+
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+        StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    catalog.addTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    catalog.addTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+        StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    appender5.init();
+
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    catalog.addTable(phone3);
+
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the right side
+      "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id",
+      // [3] no nulls, right continues after left
+      "select dep4.dep_id, dep_name, emp_id, salary from emp3 right outer join dep4 on dep4.dep_id = emp3.dep_id",
+      // [4] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 right outer join phone3 on emp3.emp_id = phone3.emp_id",
+      // [5] one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 right outer join emp3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testRightOuterMergeJoin0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testRightOuter_MergeJoin1() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(5, count);
+  }
+
+  @Test
+  public final void testRightOuterMergeJoin2() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testRightOuter_MergeJoin3() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(13, count);
+  }
+
+  @Test
+  public final void testRightOuter_MergeJoin4() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(0, count);
+  }
+
+  @Test
+  public final void testRightOuterMergeJoin5() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[5]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin5");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+}


[2/3] TAJO-216: Improve FilterPushDownRule and Implement physical operators for outer join. (camelia_c via hyunsik)

Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
new file mode 100644
index 0000000..c8df5c5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode joinNode;
+  private EvalNode joinQual;
+  private EvalContext qualCtx;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private Tuple nextLeft = null;
+
+  private final List<Tuple> leftTupleSlots;
+  private final List<Tuple> innerTupleSlots;
+
+  private JoinTupleComparator joinComparator = null;
+  private TupleComparator[] tupleComparator = null;
+
+  private final static int INITIAL_TUPLE_SLOT = 10000;
+
+  private boolean end = false;
+
+  // projection
+  private final Projector projector;
+  private final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private int posRightTupleSlots = -1;
+  private int posLeftTupleSlots = -1;
+  private boolean endInPopulationStage = false;
+  private boolean initRightDone = false;
+
+  public RightOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+                                 PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+        "but there is no join condition");
+    this.joinNode = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = this.joinQual.newContext();
+
+    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    SortSpec[][] sortSpecs = new SortSpec[2][];
+    sortSpecs[0] = outerSortKey;
+    sortSpecs[1] = innerSortKey;
+
+    this.joinComparator = new JoinTupleComparator(outer.getSchema(), inner.getSchema(), sortSpecs);
+    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+        plan.getJoinQual(), outer.getSchema(), inner.getSchema());
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+
+    leftNumCols = outer.getSchema().getColumnNum();
+  }
+
+  public JoinNode getPlan() {
+    return this.joinNode;
+  }
+
+  /**
+   * creates a tuple of a given size filled with NULL values in all fields
+   */
+  private Tuple createNullPaddedTuple(int columnNum){
+    VTuple tuple = new VTuple(columnNum);
+    for (int i = 0; i < columnNum; i++) {
+      tuple.put(i, DatumFactory.createNullDatum());
+    }
+    return tuple;
+  }
+
+  /**
+   *
+   * Right outer merge join consists of four stages
+   * <ul>
+   *   <li>initialization stage: </li>
+   *   <li>finalizing stage: </li>
+   * </ul>
+   *
+   * @return
+   * @throws IOException
+   */
+  public Tuple next() throws IOException {
+    Tuple previous;
+
+    for (;;) {
+      boolean newRound = false;
+      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+        newRound = true;
+      }
+      if ((posRightTupleSlots == innerTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+        newRound = true;
+      }
+
+      if (newRound) {
+
+        //////////////////////////////////////////////////////////////////////
+        // BEGIN FINALIZING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // The finalizing stage, where remaining tuples on the only right are transformed into left-padded results
+        if (end) {
+          if (initRightDone == false) {
+            // maybe the left operand was empty => the right one didn't have the chance to initialize
+            rightTuple = rightChild.next();
+            initRightDone = true;
+          }
+
+          if(rightTuple == null) {
+            return null;
+          } else {
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+
+            // we simulate we found a match, which is exactly the null padded one
+            rightTuple = rightChild.next();
+
+            return outTuple;
+          }
+        }
+        //////////////////////////////////////////////////////////////////////
+        // END FINALIZING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+
+        //////////////////////////////////////////////////////////////////////
+        // BEGIN INITIALIZATION STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // This stage reads the first tuple on each side
+        if (leftTuple == null) {
+          leftTuple = leftChild.next();
+
+          if (leftTuple == null) {
+            end = true;
+            continue;
+          }
+        }
+
+        if(rightTuple == null){
+          rightTuple = rightChild.next();
+
+          if(rightTuple != null){
+            initRightDone = true;
+          }
+          else {
+            initRightDone = true;
+            end = true;
+            continue;
+          }
+        }
+        //////////////////////////////////////////////////////////////////////
+        // END INITIALIZATION STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // reset tuple slots for a new round
+        leftTupleSlots.clear();
+        innerTupleSlots.clear();
+        posRightTupleSlots = -1;
+        posLeftTupleSlots = -1;
+
+
+        //////////////////////////////////////////////////////////////////////
+        // BEGIN MOVE FORWARDING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // This stage moves forward a tuple cursor on each side relation until a match
+        // is found
+        int cmp;
+        while ((end != true) && ((cmp = joinComparator.compare(leftTuple, rightTuple)) != 0)) {
+
+          // if right is lower than the left tuple, it means that all right tuples s.t. cmp <= 0 are
+          // matched tuples.
+          if (cmp > 0) {
+            // before getting a new tuple from the right,  a left null padded tuple should be built
+            // output a tuple with the nulls padded left tuple
+            Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+
+            // we simulate we found a match, which is exactly the null padded one
+            // BEFORE RETURN, MOVE FORWARD
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              end = true;
+            }
+            return outTuple;
+
+          } else if (cmp < 0) {
+            // If the left tuple is lower than the right tuple, just move forward the left tuple cursor.
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              end = true;
+              // in original algorithm we had return null ,
+              // but now we need to continue the end processing phase for remaining unprocessed right tuples
+            }
+          } // if (cmp<0)
+        } // while
+        //////////////////////////////////////////////////////////////////////
+        // END MOVE FORWARDING STAGE
+        //////////////////////////////////////////////////////////////////////
+
+        // once a match is found, retain all tuples with this key in tuple slots on each side
+        if(!end) {
+          endInPopulationStage = false;
+
+          boolean endOuter = false;
+          boolean endInner = false;
+
+          previous = new VTuple(leftTuple);
+          do {
+            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTuple = leftChild.next();
+            if( leftTuple == null) {
+              endOuter = true;
+            }
+          } while ((endOuter != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          posLeftTupleSlots = 0;
+
+          previous = new VTuple(rightTuple);
+
+          do {
+            innerTupleSlots.add(new VTuple(rightTuple));
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              endInner = true;
+            }
+
+          } while ((endInner != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          posRightTupleSlots = 0;
+
+          if ((endOuter == true) || (endInner == true)) {
+            end = true;
+            endInPopulationStage = true;
+          }
+        } // if end false
+      } // if newRound
+
+
+      // Now output result matching tuples from the slots
+      // if either we haven't reached end on neither side, or we did reach end on one(or both) sides
+      // but that happened in the slots population step (i.e. refers to next round)
+
+      if ((end == false) || ((end == true) && (endInPopulationStage == true))){
+
+        if(posLeftTupleSlots == 0){
+          nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          posLeftTupleSlots = posLeftTupleSlots + 1;
+        }
+
+
+        if(posRightTupleSlots <= (innerTupleSlots.size() -1)) {
+
+          Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+          posRightTupleSlots = posRightTupleSlots + 1;
+
+          frameTuple.set(nextLeft, aTuple);
+          joinQual.eval(qualCtx, inSchema, frameTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          return outTuple;
+
+        } else {
+          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+          if(posLeftTupleSlots <= (leftTupleSlots.size() - 1)) {
+            //rewind the right slots position
+            posRightTupleSlots = 0;
+            Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+            posRightTupleSlots = posRightTupleSlots + 1;
+            nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            posLeftTupleSlots = posLeftTupleSlots + 1;
+
+            frameTuple.set(nextLeft, aTuple);
+            joinQual.eval(qualCtx, inSchema, frameTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            return outTuple;
+          }
+        }
+      } // the second if end false
+    } // for
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    leftTupleSlots.clear();
+    innerTupleSlots.clear();
+    posRightTupleSlots = -1;
+    posLeftTupleSlots = -1;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index a33cbd7..22bcd06 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -29,6 +29,13 @@ import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.query.exception.InvalidQueryException;
 
+import org.apache.tajo.engine.eval.EvalType; 
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.catalog.Column;
+import com.google.common.collect.Sets;
+import java.util.Set;
+import java.util.Iterator;
+
 import java.util.List;
 import java.util.Stack;
 
@@ -85,11 +92,112 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
     return selNode;
   }
 
+  private boolean isOuterJoin(JoinType joinType) {
+    return joinType == JoinType.LEFT_OUTER || joinType == JoinType.RIGHT_OUTER || joinType==JoinType.FULL_OUTER;
+  }
+
   public LogicalNode visitJoin(LogicalPlan plan, JoinNode joinNode, Stack<LogicalNode> stack, List<EvalNode> cnf)
       throws PlanningException {
     LogicalNode left = joinNode.getRightChild();
     LogicalNode right = joinNode.getLeftChild();
 
+    // here we should stop selection pushdown on the null supplying side(s) of an outer join
+    // get the two operands of the join operation as well as the join type
+    JoinType joinType = joinNode.getJoinType();
+    EvalNode joinQual = joinNode.getJoinQual();
+    if (joinQual != null && isOuterJoin(joinType)) {
+
+      // if both are fields
+       if (joinQual.getLeftExpr().getType() == EvalType.FIELD && joinQual.getRightExpr().getType() == EvalType.FIELD) {
+
+          String leftTableName = ((FieldEval) joinQual.getLeftExpr()).getTableId();
+          String rightTableName = ((FieldEval) joinQual.getRightExpr()).getTableId();
+          List<String> nullSuppliers = Lists.newArrayList();
+          String [] leftLineage = PlannerUtil.getLineage(joinNode.getLeftChild());
+          String [] rightLineage = PlannerUtil.getLineage(joinNode.getRightChild());
+          Set<String> leftTableSet = Sets.newHashSet(leftLineage);
+          Set<String> rightTableSet = Sets.newHashSet(rightLineage);
+
+          // some verification
+          if (joinType == JoinType.FULL_OUTER) {
+             nullSuppliers.add(leftTableName);
+             nullSuppliers.add(rightTableName);
+
+             // verify that these null suppliers are indeed in the left and right sets
+             if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
+                throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+             }
+             if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
+                throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+             }
+
+          } else if (joinType == JoinType.LEFT_OUTER) {
+             nullSuppliers.add(((ScanNode)joinNode.getRightChild()).getTableName()); 
+             //verify that this null supplier is indeed in the right sub-tree
+             if (!rightTableSet.contains(nullSuppliers.get(0))) {
+                 throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+             }
+          } else if (joinType == JoinType.RIGHT_OUTER) {
+            if (((ScanNode)joinNode.getRightChild()).getTableName().equals(rightTableName)) {
+              nullSuppliers.add(leftTableName);
+            } else {
+              nullSuppliers.add(rightTableName);
+            }
+
+            // verify that this null supplier is indeed in the left sub-tree
+            if (!leftTableSet.contains(nullSuppliers.get(0))) {
+              throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+            }
+          }
+         
+         // retain in this outer join node's JoinQual those selection predicates
+         // related to the outer join's null supplier(s)
+         List<EvalNode> matched2 = Lists.newArrayList();
+         for (EvalNode eval : cnf) {
+            
+            Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
+            Set<String> tableNames = Sets.newHashSet();
+            // getting distinct table references
+            for (Column col : columnRefs) {
+              if (!tableNames.contains(col.getQualifier())) {
+                tableNames.add(col.getQualifier());
+              }
+            }
+            
+            //if the predicate involves any of the null suppliers
+            boolean shouldKeep=false;
+            Iterator<String> it2 = nullSuppliers.iterator();
+            while(it2.hasNext()){
+              if(tableNames.contains(it2.next()) == true) {
+                   shouldKeep = true; 
+              }
+            }
+
+            if(shouldKeep == true) {
+                matched2.add(eval);
+            }
+            
+          }
+
+          //merge the retained predicates and establish them in the current outer join node. Then remove them from the cnf
+          EvalNode qual2 = null;
+          if (matched2.size() > 1) {
+             // merged into one eval tree
+             qual2 = EvalTreeUtil.transformCNF2Singleton(
+                        matched2.toArray(new EvalNode [matched2.size()]));
+          } else if (matched2.size() == 1) {
+             // if the number of matched expr is one
+             qual2 = matched2.get(0);
+          }
+
+          if (qual2 != null) {
+             EvalNode conjQual2 = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual2);
+             joinNode.setJoinQual(conjQual2);
+             cnf.removeAll(matched2);
+          } // for the remaining cnf, push it as usual
+       }
+    }
+
     visitChild(plan, left, stack, cnf);
     visitChild(plan, right, stack, cnf);
 
@@ -112,8 +220,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
 
     if (qual != null) {
       if (joinNode.hasJoinQual()) {
-        EvalNode conjQual = EvalTreeUtil.
-            transformCNF2Singleton(joinNode.getJoinQual(), qual);
+        EvalNode conjQual = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual);
         joinNode.setJoinQual(conjQual);
       } else {
         joinNode.setJoinQual(qual);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 3e9d958..53a8449 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -385,4 +385,20 @@ public class TupleUtil {
     }
     return new TupleRange(target, startTuple, endTuple);
   }
+
+  /**
+   * It creates a tuple of a given size filled with NULL values in all fields
+   * It is usually used in outer join algorithms.
+   *
+   * @param size The number of columns of a creating tuple
+   * @return The created tuple filled with NULL values
+   */
+  public static Tuple createNullPaddedTuple(int size){
+    VTuple aTuple = new VTuple(size);
+    int i;
+    for(i = 0; i < size; i++){
+      aTuple.put(i, DatumFactory.createNullDatum());
+    }
+    return aTuple;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
new file mode 100644
index 0000000..6fb7f61
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestFullOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestFullOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    catalog.addTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+        StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    catalog.addTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x= 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    catalog.addTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+        StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    appender5.init();
+
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    catalog.addTable(phone3);
+
+
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 full outer join emp3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the left side
+      "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id",
+      // [3] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testFullOuterHashJoinExec0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+
+  }
+
+
+  @Test
+  public final void testFullOuterHashJoinExec1() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+
+  }
+
+  @Test
+  public final void testFullOuterHashJoinExec2() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+
+  }
+
+
+  @Test
+  public final void testFullOuterHashJoinExec3() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), merged,
+        workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
new file mode 100644
index 0000000..5fde9b8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestFullOuterMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestFullOuterMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private static final int UNGENERATED_PID = -1;
+
+  private TableDesc dep3;
+  private TableDesc dep4;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    catalog.addTable(dep3);
+
+
+    //----------------- dep4 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    // 10     | dep_10    | 1010
+    Schema dep4Schema = new Schema();
+    dep4Schema.addColumn("dep_id", Type.INT4);
+    dep4Schema.addColumn("dep_name", Type.TEXT);
+    dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep4Meta = CatalogUtil.newTableMeta(dep4Schema,
+        StoreType.CSV);
+    Path dep4Path = new Path(testDir, "dep4.csv");
+    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Path);
+    appender4.init();
+    Tuple tuple4 = new VTuple(dep4Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 11; i++) {
+      tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender4.addTuple(tuple4);
+    }
+
+    appender4.flush();
+    appender4.close();
+    dep4 = CatalogUtil.newTableDesc("dep4", dep4Meta, dep4Path);
+    catalog.addTable(dep4);
+
+
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+        StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    catalog.addTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    catalog.addTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+        StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    appender5.init();
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    catalog.addTable(phone3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 full outer join dep3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the right side
+      "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id",
+      // [3] no nulls, right continues after left
+      "select dep4.dep_id, dep_name, emp_id, salary from emp3 full outer join dep4 on dep4.dep_id = emp3.dep_id",
+      // [4] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id",
+      // [5] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from phone3 full outer join emp3 on emp3.emp_id = phone3.emp_id",
+  };
+
+  @Test
+  public final void testFullOuterMergeJoin0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin1() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+  }
+
+  @Test
+  public final void testFullOuterMergeJoin2() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+  }
+
+  @Test
+  public final void testFullOuterMergeJoin3() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // if it chose the hash join WITH REVERSED ORDER, convert to merge join exec
+    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(13, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin4() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin5() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[5]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin5");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
new file mode 100644
index 0000000..5486e8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLeftOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestLeftOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+        StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+                    DatumFactory.createText("dept_" + i),
+                    DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    catalog.addTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+        StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+                    DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    catalog.addTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    catalog.addTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+        StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    appender5.init();
+    
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    catalog.addTable(phone3);
+
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the left side
+      "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id",
+      // [3] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id",
+      // [4] one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testLeftOuterHashJoinExec0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterHashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof LeftOuterHashJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+       //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(5, count);
+    }
+  }
+
+    @Test
+  public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
+    
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(7, count);
+    }
+  }
+
+
+   @Test
+  public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
+    
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(7, count);
+    }
+  }
+
+  
+   @Test
+  public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
+    
+    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(0, count);
+    }
+  }
+  
+
+
+}
+ //--camelia


[3/3] git commit: TAJO-216: Improve FilterPushDownRule and Implement physical operators for outer join. (camelia_c via hyunsik)

Posted by hy...@apache.org.
TAJO-216: Improve FilterPushDownRule and Implement physical operators for outer join. (camelia_c via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3e6d684a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3e6d684a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3e6d684a

Branch: refs/heads/master
Commit: 3e6d684a9f1c7592c99d359c376956fcc9e917ba
Parents: 406337d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Oct 2 10:29:03 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Oct 2 10:32:23 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../java/org/apache/tajo/datum/CharDatum.java   |  23 +-
 .../java/org/apache/tajo/datum/Float4Datum.java |  92 ++--
 .../java/org/apache/tajo/datum/Float8Datum.java |  64 ++-
 .../java/org/apache/tajo/datum/Int2Datum.java   |  65 ++-
 .../java/org/apache/tajo/datum/Int4Datum.java   |  58 ++-
 .../java/org/apache/tajo/datum/Int8Datum.java   |  63 ++-
 .../java/org/apache/tajo/datum/NullDatum.java   |   2 +-
 .../java/org/apache/tajo/datum/TextDatum.java   |  27 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 178 ++++++-
 .../planner/physical/FullOuterHashJoinExec.java | 252 +++++++++
 .../physical/FullOuterMergeJoinExec.java        | 334 ++++++++++++
 .../planner/physical/LeftOuterHashJoinExec.java | 214 ++++++++
 .../planner/physical/LeftOuterNLJoinExec.java   | 129 +++++
 .../physical/RightOuterMergeJoinExec.java       | 343 ++++++++++++
 .../planner/rewrite/FilterPushDownRule.java     | 111 +++-
 .../org/apache/tajo/engine/utils/TupleUtil.java |  16 +
 .../physical/TestFullOuterHashJoinExec.java     | 394 ++++++++++++++
 .../physical/TestFullOuterMergeJoinExec.java    | 516 +++++++++++++++++++
 .../physical/TestLeftOuterHashJoinExec.java     | 450 ++++++++++++++++
 .../physical/TestLeftOuterNLJoinExec.java       | 459 +++++++++++++++++
 .../planner/physical/TestMergeJoinExec.java     |   2 +-
 .../physical/TestRightOuterHashJoinExec.java    | 342 ++++++++++++
 .../physical/TestRightOuterMergeJoinExec.java   | 506 ++++++++++++++++++
 24 files changed, 4518 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a148c20..e802376 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.2.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-216: Improve FilterPushDownRule and Implement physical operators 
+    for outer join. (camelia_c via hyunsik)
+
     TAJO-211: Implement regexp_replace function. (hyunsik)
 
     TAJO-212: Implement type cast expresion. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
index 9ce9089..651b00b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.datum;
 
+import com.google.common.primitives.UnsignedBytes;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.datum.exception.InvalidOperationException;
 
@@ -135,10 +136,14 @@ public class CharDatum extends Datum {
   @Override
   public BooleanDatum equalsTo(Datum datum) {
     switch (datum.type()) {
-    case CHAR:
-      return DatumFactory.createBool(this.equals(datum));
-    default:
-      throw new InvalidOperationException(datum.type());
+      case CHAR:
+        return DatumFactory.createBool(this.equals(datum));
+
+      case NULL:
+        return DatumFactory.createBool(false);
+
+      default:
+        throw new InvalidOperationException();
     }
   }
   
@@ -147,9 +152,13 @@ public class CharDatum extends Datum {
     switch (datum.type()) {
       case CHAR:
         CharDatum other = (CharDatum) datum;
-        return this.getString().compareTo(other.getString());
-    default:
-      throw new InvalidOperationException(datum.type());
+        return UnsignedBytes.lexicographicalComparator().compare(bytes, other.bytes);
+
+      case NULL:
+        return -1;
+
+      default:
+        throw new InvalidOperationException();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
index 97e608a..8de20ee 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
@@ -33,7 +33,7 @@ public class Float4Datum extends Datum implements NumericDatum {
 	public Float4Datum() {
 		super(TajoDataTypes.Type.FLOAT4);
 	}
-	
+
 	public Float4Datum(float val) {
 		this();
 		this.val = val;
@@ -44,7 +44,7 @@ public class Float4Datum extends Datum implements NumericDatum {
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     this.val = bb.getFloat();
   }
-	
+
 	public boolean asBool() {
 		throw new InvalidCastException();
 	}
@@ -53,7 +53,7 @@ public class Float4Datum extends Datum implements NumericDatum {
   public char asChar() {
     return asChars().charAt(0);
   }
-	
+
 	@Override
 	public short asInt2() {
 		return (short) val;
@@ -105,7 +105,7 @@ public class Float4Datum extends Datum implements NumericDatum {
   public int size() {
     return size;
   }
-  
+
   @Override
   public int hashCode() {
     return (int) val;
@@ -117,73 +117,87 @@ public class Float4Datum extends Datum implements NumericDatum {
       Float4Datum other = (Float4Datum) obj;
       return val == other.val;
     }
-    
+
     return false;
   }
 
   @Override
   public BooleanDatum equalsTo(Datum datum) {
     switch (datum.type()) {
-    case INT2:
-      return DatumFactory.createBool(val == datum.asInt2());
-    case INT4:
-      return DatumFactory.createBool(val == datum.asInt4());
-    case INT8:
-      return DatumFactory.createBool(val == datum.asInt8());
-    case FLOAT4:
-      return DatumFactory.createBool(val == datum.asFloat4());
-    case FLOAT8:
-      return DatumFactory.createBool(val == datum.asFloat8());
-    default:
-      throw new InvalidOperationException(datum.type());
+      case INT2:
+        return DatumFactory.createBool(val == datum.asInt2());
+      case INT4:
+        return DatumFactory.createBool(val == datum.asInt4());
+      case INT8:
+        return DatumFactory.createBool(val == datum.asInt8());
+      case FLOAT4:
+        return DatumFactory.createBool(val == datum.asFloat4());
+      case FLOAT8:
+        return DatumFactory.createBool(val == datum.asFloat8());
+      case NULL:
+        return DatumFactory.createBool(false);
+      default:
+        throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -200,6 +214,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -218,6 +234,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -236,6 +254,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException();
     }
@@ -254,6 +274,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -272,6 +294,8 @@ public class Float4Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat4(val / datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val / datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }
@@ -279,6 +303,6 @@ public class Float4Datum extends Datum implements NumericDatum {
 
   @Override
   public void inverseSign() {
-    this.val = - val;    
+    this.val = - val;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
index d531942..1857929 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
@@ -28,10 +28,16 @@ import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
 public class Float8Datum extends Datum implements NumericDatum {
   private static final int size = 8;
   @Expose private double val;
 
+  private static final Log LOG = LogFactory.getLog(Float8Datum.class);
+
 	public Float8Datum() {
 		super(TajoDataTypes.Type.FLOAT8);
 	}
@@ -126,56 +132,70 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createBool(val == datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createBool(val == datum.asFloat8());
+    case NULL:
+      return DatumFactory.createBool(false);
     default:
-      throw new InvalidOperationException(datum.type());
+      throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8:{
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -192,6 +212,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -210,6 +232,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -228,6 +252,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException();
     }
@@ -246,6 +272,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -264,6 +292,8 @@ public class Float8Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat8(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
index b34031f..000a107 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
@@ -25,10 +25,17 @@ import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
 public class Int2Datum extends Datum implements NumericDatum {
   private static final int size = 2;  
   @Expose private short val;
 
+  private static final Log LOG = LogFactory.getLog(Int2Datum.class);
+
   public Int2Datum() {
     super(TajoDataTypes.Type.INT2);
   }
@@ -124,56 +131,70 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createBool(val == datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createBool(val == datum.asFloat8());
+    case NULL:
+      return DatumFactory.createBool(false);
     default:
-      throw new InvalidOperationException(datum.type());
+      throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -190,6 +211,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -208,6 +231,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -226,6 +251,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -244,6 +271,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -262,6 +291,8 @@ public class Int2Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat4(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
index 1422187..a68a5f1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
@@ -20,12 +20,12 @@ package org.apache.tajo.datum;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.exception.InvalidCastException;
 import org.apache.tajo.datum.exception.InvalidOperationException;
 import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+
 public class Int4Datum extends Datum implements NumericDatum {
   private static final int size = 4;
   @Expose private int val;
@@ -129,6 +129,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createBool(val == datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createBool(val == datum.asFloat8());
+    case NULL:
+      return DatumFactory.createBool(false);
     default:
       throw new InvalidOperationException();
     }
@@ -137,48 +139,60 @@ public class Int4Datum extends Datum implements NumericDatum {
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4:{
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -195,6 +209,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -213,6 +229,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -231,6 +249,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException();
     }
@@ -249,6 +269,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -267,6 +289,8 @@ public class Int4Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat4(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
index 689adbc..7e8bd27 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
@@ -26,6 +26,11 @@ import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
 public class Int8Datum extends Datum implements NumericDatum {
   private static final int size = 8;
   @Expose private long val;
@@ -135,56 +140,70 @@ public class Int8Datum extends Datum implements NumericDatum {
         return DatumFactory.createBool(val == datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createBool(val == datum.asFloat8());
+      case NULL:
+        return DatumFactory.createBool(false);
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8:{
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -201,6 +220,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -219,6 +240,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -237,6 +260,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -255,6 +280,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -273,6 +300,8 @@ public class Int8Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat8(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
index 10c35a9..2cce626 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
@@ -104,4 +104,4 @@ public class NullDatum extends Datum {
   public int hashCode() {
     return 0; // one of the prime number
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index bab99a2..a155f1a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.datum;
 
+import com.google.common.primitives.UnsignedBytes;
 import com.google.gson.annotations.Expose;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.tajo.common.TajoDataTypes;
@@ -26,6 +27,9 @@ import org.apache.tajo.datum.exception.InvalidOperationException;
 
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 public class TextDatum extends Datum {
   @Expose private int size;
   @Expose private byte [] bytes;
@@ -102,11 +106,14 @@ public class TextDatum extends Datum {
   public int compareTo(Datum datum) {
     switch (datum.type()) {
       case TEXT:
-        byte[] o = datum.asByteArray();
-        return WritableComparator.compareBytes(this.bytes, 0, this.bytes.length,
-            o, 0, o.length);
+      case CHAR:
+      case BLOB:
+        return UnsignedBytes.lexicographicalComparator().compare(bytes, datum.asByteArray());
+
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -114,7 +121,7 @@ public class TextDatum extends Datum {
   public boolean equals(Object obj) {
     if (obj instanceof TextDatum) {
       TextDatum o = (TextDatum) obj;
-      return Arrays.equals(this.bytes, o.bytes);
+      return UnsignedBytes.lexicographicalComparator().compare(this.bytes, o.bytes) == 0;
     }
 
     return false;
@@ -124,10 +131,14 @@ public class TextDatum extends Datum {
   public BooleanDatum equalsTo(Datum datum) {
     switch (datum.type()) {
       case TEXT:
-        return DatumFactory.createBool(
-            Arrays.equals(this.bytes, datum.asByteArray()));
+      case CHAR:
+      case BLOB:
+        return DatumFactory.createBool(UnsignedBytes.lexicographicalComparator()
+            .compare(bytes, datum.asByteArray()) == 0);
+      case NULL:
+        return DatumFactory.createBool(false);
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 8110a3b..c6e4695 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -57,6 +57,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   protected final TajoConf conf;
   protected final AbstractStorageManager sm;
 
+  final long threshold = 1048576 * 128; // 64MB
+
 
   public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
     this.conf = conf;
@@ -194,15 +196,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     switch (joinNode.getJoinType()) {
       case CROSS:
-        LOG.info("The planner chooses [Nested Loop Join]");
         return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
 
       case INNER:
         return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
 
-      case FULL_OUTER:
       case LEFT_OUTER:
+        return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
       case RIGHT_OUTER:
+        return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case FULL_OUTER:
+        return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
 
       case LEFT_SEMI:
       case RIGHT_SEMI:
@@ -215,6 +221,171 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
+  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+        case NESTED_LOOP_JOIN:
+          //the right operand is too large, so we opt for NL implementation of left outer join
+          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+          return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+        default:
+          LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                   PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long rightTableVolume = estimateSizeRecursive(context, rightLineage);
+
+    if (rightTableVolume < threshold) {
+      // we can implement left outer join using hash join, using the right operand as the build relation
+      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+      return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+    }
+    else {
+      //the right operand is too large, so we opt for NL implementation of left outer join
+      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+      return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
+    // blocking, but merge join is blocking as well)
+    String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
+    long outerSize4 = estimateSizeRecursive(context, outerLineage4);
+    if (outerSize4 < threshold){
+      LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+      return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+    } else {
+      return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                     PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    //the left operand is too large, so opt for merge join implementation
+    LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+    SortSpec[][] sortSpecs2 = PlannerUtil.getSortKeysFromJoinQual(
+        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+    ExternalSortExec outerSort2 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs2[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
+    ExternalSortExec innerSort2 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs2[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
+    return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
+  }
+
+  private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+          return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+        case MERGE_JOIN:
+          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+        default:
+          LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestRightJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+
+        case MERGE_JOIN:
+          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+
+        default:
+          LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private FullOuterHashJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                            PhysicalExec leftExec, PhysicalExec rightExec)
+      throws IOException {
+    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long outerSize2 = estimateSizeRecursive(context, leftLineage);
+    long innerSize2 = estimateSizeRecursive(context, rightLineage);
+
+    PhysicalExec selectedRight;
+    PhysicalExec selectedLeft;
+
+    // HashJoinExec loads the smaller relation to memory.
+    if (outerSize2 <= innerSize2) {
+      selectedLeft = leftExec;
+      selectedRight = rightExec;
+    } else {
+      selectedLeft = rightExec;
+      selectedRight = leftExec;
+    }
+    LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
+    return new FullOuterHashJoinExec(context, plan, selectedRight, selectedLeft);
+  }
+
+  private FullOuterMergeJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                              PhysicalExec leftExec, PhysicalExec rightExec)
+      throws IOException {
+    // if size too large, full outer merge join implementation
+    LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Merge Join]");
+    SortSpec[][] sortSpecs3 = PlannerUtil.getSortKeysFromJoinQual(plan.getJoinQual(),
+        leftExec.getSchema(), rightExec.getSchema());
+    ExternalSortExec outerSort3 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs3[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
+    ExternalSortExec innerSort3 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs3[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
+
+    return new FullOuterMergeJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
+  }
+
+  private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                   PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long outerSize2 = estimateSizeRecursive(context, leftLineage);
+    long innerSize2 = estimateSizeRecursive(context, rightLineage);
+    final long threshold = 1048576 * 128;
+    if (outerSize2 < threshold || innerSize2 < threshold) {
+      return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+    } else {
+      return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
   private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
                                            PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
@@ -358,8 +529,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new StoreTableExec(ctx, plan, subOp);
   }
 
-  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
-      throws IOException {
+  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
     Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
         "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
new file mode 100644
index 0000000..aebfdb5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class FullOuterHashJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private Map<Tuple, Boolean> matched;
+
+  public FullOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+                               PhysicalExec inner) {
+    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+        plan.getOutSchema(), outer, inner);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = joinQual.newContext();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+    this.matched = new HashMap<Tuple, Boolean>(10000);
+
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
+        outer.getSchema(), inner.getSchema());
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = outer.getSchema().getColumnNum();
+    rightNumCols = inner.getSchema().getColumnNum();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple getNextUnmatchedRight() {
+
+    List<Tuple> newValue;
+    Tuple returnedTuple;
+    // get a keyTUple from the matched hashmap with a boolean false value
+    for(Tuple aKeyTuple : matched.keySet()) {
+      if(matched.get(aKeyTuple) == false) {
+        newValue = tupleSlots.get(aKeyTuple);
+        returnedTuple = newValue.remove(0);
+        tupleSlots.put(aKeyTuple, newValue);
+
+        // after taking the last element from the list in tupleSlots, set flag true in matched as well
+        if(newValue.isEmpty()){
+          matched.put(aKeyTuple, true);
+        }
+
+        return returnedTuple;
+      }
+    }
+    return null;
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+          Tuple unmatchedRightTuple = getNextUnmatchedRight();
+          if( unmatchedRightTuple == null) {
+            finished = true;
+            outTuple = null;
+            return null;
+          } else {
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+
+            return outTuple;
+          }
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        found = true;
+        getKeyLeftTuple(leftTuple, leftKeyTuple);
+        matched.put(leftKeyTuple, true);
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      List<Tuple> newValue;
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      if (tupleSlots.containsKey(keyTuple)) {
+        newValue = tupleSlots.get(keyTuple);
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+        matched.put(keyTuple,false);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  public void close() throws IOException {
+    tupleSlots.clear();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
new file mode 100644
index 0000000..2c11fea
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class FullOuterMergeJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode joinNode;
+  private EvalNode joinQual;
+  private EvalContext qualCtx;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private Tuple leftNext = null;
+
+  private final List<Tuple> leftTupleSlots;
+  private final List<Tuple> rightTupleSlots;
+
+  private JoinTupleComparator joincomparator = null;
+  private TupleComparator[] tupleComparator = null;
+
+  private final static int INITIAL_TUPLE_SLOT = 10000;
+
+  private boolean end = false;
+
+  // projection
+  private final Projector projector;
+  private final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private int posRightTupleSlots = -1;
+  private int posLeftTupleSlots = -1;
+  boolean endInPopulationStage = false;
+  private boolean initRightDone = false;
+
+  public FullOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                                PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+        "but there is no join condition");
+    this.joinNode = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = this.joinQual.newContext();
+
+    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    SortSpec[][] sortSpecs = new SortSpec[2][];
+    sortSpecs[0] = leftSortKey;
+    sortSpecs[1] = rightSortKey;
+
+    this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
+        rightChild.getSchema(), sortSpecs);
+    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+        plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+
+    leftNumCols = leftChild.getSchema().getColumnNum();
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  public JoinNode getPlan(){
+    return this.joinNode;
+  }
+
+  public Tuple next() throws IOException {
+    Tuple previous;
+
+    for (;;) {
+      boolean newRound = false;
+      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+        newRound = true;
+      }
+      if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+        newRound = true;
+      }
+
+      if(newRound == true){
+
+        if (end) {
+
+          ////////////////////////////////////////////////////////////////////////
+          // FINALIZING STAGE
+          ////////////////////////////////////////////////////////////////////////
+          // the finalizing stage, where remaining tuples on the right are
+          // transformed into left-padded results while tuples on the left
+          // are transformed into right-padded results
+
+          // before exit, a left-padded tuple should be built for all remaining
+          // right side and a right-padded tuple should be built for all remaining
+          // left side
+
+          if (initRightDone == false) {
+            // maybe the left operand was empty => the right one didn't have the chance to initialize
+            rightTuple = rightChild.next();
+            initRightDone = true;
+          }
+
+          if((leftTuple == null) && (rightTuple == null)) {
+            return null;
+          }
+
+          if((leftTuple == null) && (rightTuple != null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            rightTuple = rightChild.next();
+            return outTuple;
+          }
+
+          if((leftTuple != null) && (rightTuple == null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            leftTuple = leftChild.next();
+            return outTuple;
+          }
+        } // if end
+
+        ////////////////////////////////////////////////////////////////////////
+        // INITIALIZING STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // initializing stage, reading the first tuple on each side
+        if (leftTuple == null) {
+          leftTuple = leftChild.next();
+          if( leftTuple == null){
+            end = true;
+            continue;
+          }
+        }
+        if (rightTuple == null) {
+          rightTuple = rightChild.next();
+          initRightDone = true;
+          if (rightTuple == null) {
+            end = true;
+            continue;
+          }
+        }
+
+        // reset tuple slots for a new round
+        leftTupleSlots.clear();
+        rightTupleSlots.clear();
+        posRightTupleSlots = -1;
+        posLeftTupleSlots = -1;
+
+        ////////////////////////////////////////////////////////////////////////
+        // Comparison and Move Forward Stage
+        ////////////////////////////////////////////////////////////////////////
+        // advance alternatively on each side until a match is found
+        int cmp;
+        while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
+
+          if (cmp > 0) {
+
+            //before getting a new tuple from the right,  a leftnullpadded tuple should be built
+            //output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // BEFORE RETURN, MOVE FORWARD
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } else if (cmp < 0) {
+            // before getting a new tuple from the left,  a rightnullpadded tuple should be built
+            // output a tuple with the nulls padded rightTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            // BEFORE RETURN, MOVE FORWARD
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } // if (cmp < 0)
+        } //while
+
+
+        ////////////////////////////////////////////////////////////////////////
+        // SLOTS POPULATION STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // once a match is found, retain all tuples with this key in tuple slots
+        // on each side
+        if(!end) {
+          endInPopulationStage = false;
+
+          boolean endLeft = false;
+          boolean endRight = false;
+
+          previous = new VTuple(leftTuple);
+          do {
+            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              endLeft = true;
+            }
+
+
+          } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          posLeftTupleSlots = 0;
+
+
+          previous = new VTuple(rightTuple);
+          do {
+            rightTupleSlots.add(new VTuple(rightTuple));
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              endRight = true;
+            }
+
+          } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          posRightTupleSlots = 0;
+
+          if ((endLeft == true) || (endRight == true)) {
+            end = true;
+            endInPopulationStage = true;
+          }
+
+        } // if end false
+      } // if newRound
+
+
+      ////////////////////////////////////////////////////////////////////////
+      // RESULTS STAGE
+      ////////////////////////////////////////////////////////////////////////
+      // now output result matching tuples from the slots
+      // if either we haven't reached end on neither side, or we did reach end
+      // on one(or both) sides but that happened in the slots population step
+      // (i.e. refers to next round)
+      if(!end || (end && endInPopulationStage)){
+        if(posLeftTupleSlots == 0){
+          leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          posLeftTupleSlots = posLeftTupleSlots + 1;
+        }
+
+        if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
+          Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+          posRightTupleSlots = posRightTupleSlots + 1;
+          frameTuple.set(leftNext, aTuple);
+          joinQual.eval(qualCtx, inSchema, frameTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          return outTuple;
+        } else {
+          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+          if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
+            //rewind the right slots position
+            posRightTupleSlots = 0;
+            Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+            posRightTupleSlots = posRightTupleSlots + 1;
+            leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            posLeftTupleSlots = posLeftTupleSlots + 1;
+
+            frameTuple.set(leftNext, aTuple);
+            joinQual.eval(qualCtx, inSchema, frameTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            return outTuple;
+          }
+        }
+      } // the second if end false
+    } // for
+  }
+
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    leftTupleSlots.clear();
+    rightTupleSlots.clear();
+    posRightTupleSlots = -1;
+    posLeftTupleSlots = -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
new file mode 100644
index 0000000..bbf39dd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.tajo.datum.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class LeftOuterHashJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private static final Log LOG = LogFactory.getLog(LeftOuterHashJoinExec.class);
+
+  public LeftOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                               PhysicalExec rightChild) {
+    super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
+        plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = joinQual.newContext();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = leftChild.getSchema().getColumnNum();
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          finished = true;
+          return null;
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      joinQual.eval(qualCtx, inSchema, frameTuple);
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        found = true;
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      List<Tuple> newValue;
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      if (tupleSlots.containsKey(keyTuple)) {
+        newValue = tupleSlots.get(keyTuple);
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  public void close() throws IOException {
+    tupleSlots.clear();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
new file mode 100644
index 0000000..cc7b331
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class LeftOuterNLJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode plan;
+  private EvalNode joinQual;
+
+  // temporal tuples and states for nested loop join
+  private boolean needNextRightTuple;
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private EvalContext qualCtx;
+
+  // projection
+  private final EvalContext [] evalContexts;
+  private final Projector projector;
+
+  private boolean foundAtLeastOneMatch;
+  private int rightNumCols;
+
+  public LeftOuterNLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                             PhysicalExec rightChild) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+
+    if (plan.hasJoinQual()) {
+      this.joinQual = plan.getJoinQual();
+      this.qualCtx = this.joinQual.newContext();
+    }
+
+    // for projection
+    projector = new Projector(inSchema, outSchema, plan.getTargets());
+    evalContexts = projector.renew();
+
+    // for join
+    needNextRightTuple = true;
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+
+    foundAtLeastOneMatch = false;
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+
+  public Tuple next() throws IOException {
+    for (;;) {
+      if (needNextRightTuple) {
+        leftTuple = leftChild.next();
+        if (leftTuple == null) {
+          return null;
+        }
+        needNextRightTuple = false;
+        // a new tuple from the left child has initially no matches on the right operand
+        foundAtLeastOneMatch = false;
+      }
+      rightTuple = rightChild.next();
+
+      if (rightTuple == null) {
+        // the scan of the right operand is finished with no matches found
+        if(foundAtLeastOneMatch == false){
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          foundAtLeastOneMatch = true;
+          needNextRightTuple = true;
+          rightChild.rescan();
+          return outTuple;
+        } else {
+          needNextRightTuple = true;
+          rightChild.rescan();
+          continue;
+        }
+      }
+
+      frameTuple.set(leftTuple, rightTuple);
+      joinQual.eval(qualCtx, inSchema, frameTuple);
+      if (joinQual.terminate(qualCtx).asBool()) {
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        foundAtLeastOneMatch = true;
+        return outTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    needNextRightTuple = true;
+  }
+}