You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:07 UTC

[04/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
new file mode 100644
index 0000000..1dbbcf0
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashAntiJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeSchema.size());
+
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [0-9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+
+  // relation descriptions
+  // employee (managerid, empid, memid, deptname)
+  // people (empid, fk_memid, name, age)
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+  };
+
+  @Test
+  public final void testHashAntiJoin() throws IOException, PlanningException {
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with an hash anti join.
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+      SeqScanExec scanLeftChild = sortLeftChild.getChild();
+      SeqScanExec scanRightChild = sortRightChild.getChild();
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+      } else {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+      }
+    } else if (exec instanceof HashJoinExec) {
+      HashJoinExec join = (HashJoinExec) exec;
+      SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+      } else {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      }
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4()); // expected empid [0, 2, 4, 6, 8]
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(5 , count); // the expected result : [0, 2, 4, 6, 8]
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
new file mode 100644
index 0000000..66222da
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private final Session session = LocalTajoTestingUtility.createDummySession();
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeSchema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e inner join " +
+          "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+  };
+
+  @Test
+  public final void testHashInnerJoin() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashJoinExec);
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(10 / 2, count);
+  }
+
+  @Test
+  public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    TajoConf localConf = new TajoConf(conf);
+    localConf.setLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD, 100l);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(localConf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashJoinExec);
+    HashJoinExec joinExec = proj.getChild();
+
+    assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec);
+  }
+
+  /**
+   * It checks inner-join related functions. It will return TRUE if left relations is smaller than right relations.
+   *
+   * The below unit tests will work according to which side is smaller. In this unit tests, we use two tables: p and e.
+   * The table p is 75 bytes, and the table e is 140 bytes. Since we cannot expect that which side is smaller,
+   * we use some boolean variable <code>leftSmaller</code> to indicate which side is small.
+   */
+  private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
+                                                       PhysicalPlannerImpl phyPlanner,
+                                                       JoinNode joinNode, BinaryPhysicalExec joinExec) throws
+      IOException {
+
+    String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
+    String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
+
+    boolean leftSmaller;
+    if (left[0].equals("default.p")) {
+      leftSmaller = true;
+    } else {
+      leftSmaller = false;
+    }
+
+    long leftSize = phyPlanner.estimateSizeRecursive(ctx, left);
+    long rightSize = phyPlanner.estimateSizeRecursive(ctx, right);
+
+    // The table p is 75 bytes, and the table e is 140 bytes.
+    if (leftSmaller) { // if left one is smaller
+      assertEquals(75, leftSize);
+      assertEquals(140, rightSize);
+    } else { // if right one is smaller
+      assertEquals(140, leftSize);
+      assertEquals(75, rightSize);
+    }
+
+    if (leftSmaller) {
+      PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+          joinExec.getRightChild());
+      assertEquals(ordered[0], joinExec.getLeftChild());
+      assertEquals(ordered[1], joinExec.getRightChild());
+
+      assertEquals("default.p", left[0]);
+      assertEquals("default.e", right[0]);
+    } else {
+      PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+          joinExec.getRightChild());
+      assertEquals(ordered[1], joinExec.getLeftChild());
+      assertEquals(ordered[0], joinExec.getRightChild());
+
+      assertEquals("default.e", left[0]);
+      assertEquals("default.p", right[0]);
+    }
+
+    if (leftSmaller) {
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+    } else {
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+    }
+
+    return leftSmaller;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
new file mode 100644
index 0000000..f0d846c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHashPartitioner {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testGetPartition() {   
+    Tuple tuple1 = new VTuple(3);    
+    tuple1.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(3)
+    });
+    Tuple tuple2 = new VTuple(3);    
+    tuple2.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(4)
+    });
+    Tuple tuple3 = new VTuple(3);    
+    tuple3.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(5)
+    });
+    Tuple tuple4 = new VTuple(3);    
+    tuple4.put(new Datum[] {
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(3)
+    });
+    Tuple tuple5 = new VTuple(3);    
+    tuple5.put(new Datum[] {
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(4)
+    });
+    
+    int [] partKeys = {0,1};
+    Partitioner p = new HashPartitioner(partKeys, 2);
+    
+    int part1 = p.getPartition(tuple1);
+    assertEquals(part1, p.getPartition(tuple2));
+    assertEquals(part1, p.getPartition(tuple3));
+    
+    int part2 = p.getPartition(tuple4);
+    assertEquals(part2, p.getPartition(tuple5));    
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
new file mode 100644
index 0000000..4e5de98
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashSemiJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeSchema.size());
+
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [0-9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    // make 27 tuples
+    for (int i = 1; i < 10; i += 2) {
+      // make three duplicated tuples for each tuples
+      for (int j = 0; j < 3; j++) {
+        tuple.put(new Datum[] {
+            DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+            DatumFactory.createInt4(10 + i),
+            DatumFactory.createText("name_" + i),
+            DatumFactory.createInt4(30 + i) });
+        appender.addTuple(tuple);
+      }
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+
+  // relation descriptions
+  // employee (managerid, empid, memid, deptname)
+  // people (empid, fk_memid, name, age)
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+  };
+
+  @Test
+  public final void testHashSemiJoin() throws IOException, PlanningException {
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with an hash anti join.
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+      SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
+      SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+      } else {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+      }
+    } else if (exec instanceof HashJoinExec) {
+      HashJoinExec join = (HashJoinExec) exec;
+      SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+      } else {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      }
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    // expect result without duplicated tuples.
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(5 , count); // the expected result: [1, 3, 5, 7, 9]
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
new file mode 100644
index 0000000..de3d298
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLeftOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestLeftOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private Session session = LocalTajoTestingUtility.createDummySession();
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+                    DatumFactory.createText("dept_" + i),
+                    DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+                    DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
+    appender5.init();
+    
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+    catalog.createTable(phone3);
+
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the left side
+      "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id",
+      // [3] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id",
+      // [4] one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testLeftOuterHashJoinExec0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterHashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashLeftOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+       //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(5, count);
+    }
+  }
+
+    @Test
+  public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
+    
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(7, count);
+    }
+  }
+
+
+   @Test
+  public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
+    
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(7, count);
+    }
+  }
+
+  
+   @Test
+  public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
+    
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "default.emp3", emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "default.phone3", phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(0, count);
+    }
+  }
+  
+
+
+}
+ //--camelia

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
new file mode 100644
index 0000000..e806e55
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestLeftOuterNLJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestLeftOuterNLJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private static final Session session = LocalTajoTestingUtility.createDummySession();
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+                    DatumFactory.createText("dept_" + i),
+                    DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+                    DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    // ---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
+    appender5.init();
+    
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+    catalog.createTable(phone3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+  
+  String[] QUERIES = {
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+      "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id", //1 nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id", //2 nulls on the left side
+      "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id", //3 one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id" //4 one operand is empty
+  };
+
+  @Test
+  public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context =  analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+    }
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+       //TODO check contents
+         count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context =  analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(5, count);
+  }
+
+  @Test
+  public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context =  analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context =  analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+
+    @Test
+  public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context =  analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+      HashLeftOuterJoinExec join = proj.getChild();
+      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      proj.setChild(aJoin);
+      exec = proj;
+     
+    }
+
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+       //TODO check contents
+         count = count + 1;
+      
+    }
+    exec.close();
+    assertEquals(0, count);
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
new file mode 100644
index 0000000..0e4fd9a
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    FileSystem fs = testDir.getFileSystem(conf);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeSchema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+    for (int i = 11; i < 20; i+=2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+    for (int i = 10; i < 20; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e inner join " +
+          "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+  };
+
+  @Test
+  public final void testMergeInnerJoin() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+    LogicalNode root = plan.getRootBlock().getRoot();
+
+    JoinNode joinNode = PlannerUtil.findTopNode(root, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] empFrags = sm.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = sm.splitNG(conf, "default.p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, root);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeJoinExec);
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue((10 + i) == tuple.get(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(10, count); // expected 10 * 5
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
new file mode 100644
index 0000000..120113f
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestNLJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestNLJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  private MasterPlan masterPlan;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", Type.INT4);
+    schema.addColumn("empid", Type.INT4);
+    schema.addColumn("memid", Type.INT4);
+    schema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < 50; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i)});
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", schema, employeeMeta, employeePath);
+    catalog.createTable(employee);
+    
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < 50; i += 2) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i)});
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+    
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+
+    masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+  
+  String[] QUERIES = {
+    "select managerId, e.empId, deptName, e.memId from employee as e, people p",
+    "select managerId, e.empId, deptName, e.memId from employee as e inner join people as p on " +
+        "e.empId = p.empId and e.memId = p.fk_memId"
+  };
+  
+  @Test
+  public final void testNLCrossJoin() throws IOException, PlanningException {
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(),
+        context).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    int i = 0;
+    exec.init();
+    while (exec.next() != null) {
+      i++;
+    }
+    exec.close();
+    assertEquals(50*50/2, i); // expected 10 * 5
+  }
+
+  @Test
+  public final void testNLInnerJoin() throws IOException, PlanningException {
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context =  analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(),
+        context).getRootBlock().getRoot();
+    //LogicalOptimizer.optimize(ctx, plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
+      i += 2;
+    }
+    exec.close();
+    assertEquals(50 / 2, count);
+  }
+}