You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/05/15 05:59:27 UTC

[1/2] tajo git commit: TAJO-1542 Refactoring of HashJoinExecs. (contributed by navis, committed by hyunsik)

Repository: tajo
Updated Branches:
  refs/heads/master f3acbdf5c -> 36a703c5d


http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index e20686b..9afc51f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -319,24 +319,17 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
-       //for this small data set this is not likely to happen
-      
-      assertEquals(1, 0);
-    }
-    else{
-       Tuple tuple;
-       int count = 0;
-       int i = 1;
-       exec.init();
-  
-       while ((tuple = exec.next()) != null) {
-         //TODO check contents
-         count = count + 1;
-       }
-       exec.close();
-       assertEquals(5, count);
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      //TODO check contents
+      count = count + 1;
     }
+    exec.close();
+    assertEquals(5, count);
   }
 
     @Test
@@ -361,24 +354,17 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
-      //for this small data set this is not likely to happen
-      
-      assertEquals(1, 0);
-    }
-    else{
-       Tuple tuple;
-       int count = 0;
-       int i = 1;
-       exec.init();
-  
-       while ((tuple = exec.next()) != null) {
-         //TODO check contents
-         count = count + 1;
-       }
-       exec.close();
-       assertEquals(7, count);
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      //TODO check contents
+      count = count + 1;
     }
+    exec.close();
+    assertEquals(7, count);
   }
 
 
@@ -403,24 +389,17 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
-      //for this small data set this is not likely to happen
-      
-      assertEquals(1, 0);
-    }
-    else{
-       Tuple tuple;
-       int count = 0;
-       int i = 1;
-       exec.init();
-  
-       while ((tuple = exec.next()) != null) {
-         //TODO check contents
-         count = count + 1;
-       }
-       exec.close();
-       assertEquals(7, count);
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      //TODO check contents
+      count = count + 1;
     }
+    exec.close();
+    assertEquals(7, count);
   }
 
   
@@ -445,22 +424,15 @@ public class TestLeftOuterHashJoinExec {
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof NLLeftOuterJoinExec) {
-      //for this small data set this is not likely to happen
-      
-      assertEquals(1, 0);
-    }
-    else{
-       int count = 0;
-       exec.init();
-  
-       while (exec.next() != null) {
-         //TODO check contents
-         count = count + 1;
-       }
-       exec.close();
-       assertEquals(0, count);
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
     }
+    exec.close();
+    assertEquals(0, count);
   }
   
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
deleted file mode 100644
index 10fd3d4..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ /dev/null
@@ -1,474 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class TestLeftOuterNLJoinExec {
-  private TajoConf conf;
-  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuterNLJoinExec";
-  private TajoTestingCluster util;
-  private CatalogService catalog;
-  private SQLAnalyzer analyzer;
-  private LogicalPlanner planner;
-  private QueryContext defaultContext;
-  private Path testDir;
-
-  private TableDesc dep3;
-  private TableDesc job3;
-  private TableDesc emp3;
-  private TableDesc phone3;
-
-  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
-  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
-  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
-  private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
-
-  @Before
-  public void setUp() throws Exception {
-    util = new TajoTestingCluster();
-    catalog = util.startCatalogCluster().getCatalog();
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
-    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    conf = util.getConfiguration();
-
-    //----------------- dep3 ------------------------------
-    // dep_id | dep_name  | loc_id
-    //--------------------------------
-    //  0     | dep_0     | 1000
-    //  1     | dep_1     | 1001
-    //  2     | dep_2     | 1002
-    //  3     | dep_3     | 1003
-    //  4     | dep_4     | 1004
-    //  5     | dep_5     | 1005
-    //  6     | dep_6     | 1006
-    //  7     | dep_7     | 1007
-    //  8     | dep_8     | 1008
-    //  9     | dep_9     | 1009
-    Schema dep3Schema = new Schema();
-    dep3Schema.addColumn("dep_id", Type.INT4);
-    dep3Schema.addColumn("dep_name", Type.TEXT);
-    dep3Schema.addColumn("loc_id", Type.INT4);
-
-
-    TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
-    Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
-        .getAppender(dep3Meta, dep3Schema, dep3Path);
-    appender1.init();
-    Tuple tuple = new VTuple(dep3Schema.size());
-    for (int i = 0; i < 10; i++) {
-      tuple.put(new Datum[] { DatumFactory.createInt4(i),
-                    DatumFactory.createText("dept_" + i),
-                    DatumFactory.createInt4(1000 + i) });
-      appender1.addTuple(tuple);
-    }
-
-    appender1.flush();
-    appender1.close();
-    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
-    catalog.createTable(dep3);
-
-    //----------------- job3 ------------------------------
-    //  job_id  | job_title
-    // ----------------------
-    //   101    |  job_101
-    //   102    |  job_102
-    //   103    |  job_103
-
-    Schema job3Schema = new Schema();
-    job3Schema.addColumn("job_id", Type.INT4);
-    job3Schema.addColumn("job_title", Type.TEXT);
-
-
-    TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
-    Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
-        .getAppender(job3Meta, job3Schema, job3Path);
-    appender2.init();
-    Tuple tuple2 = new VTuple(job3Schema.size());
-    for (int i = 1; i < 4; i++) {
-      int x = 100 + i;
-      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
-                    DatumFactory.createText("job_" + x) });
-      appender2.addTuple(tuple2);
-    }
-
-    appender2.flush();
-    appender2.close();
-    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
-    catalog.createTable(job3);
-
-
-
-    //---------------------emp3 --------------------
-    // emp_id  | first_name | last_name | dep_id | salary | job_id
-    // ------------------------------------------------------------
-    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
-    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
-    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
-    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
-    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
-    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
-    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
-
-    Schema emp3Schema = new Schema();
-    emp3Schema.addColumn("emp_id", Type.INT4);
-    emp3Schema.addColumn("first_name", Type.TEXT);
-    emp3Schema.addColumn("last_name", Type.TEXT);
-    emp3Schema.addColumn("dep_id", Type.INT4);
-    emp3Schema.addColumn("salary", Type.FLOAT4);
-    emp3Schema.addColumn("job_id", Type.INT4);
-
-
-    TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
-    Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
-        .getAppender(emp3Meta, emp3Schema, emp3Path);
-    appender3.init();
-    Tuple tuple3 = new VTuple(emp3Schema.size());
-
-    for (int i = 1; i < 4; i += 2) {
-      int x = 10 + i;
-      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
-          DatumFactory.createText("firstname_" + x),
-          DatumFactory.createText("lastname_" + x),
-          DatumFactory.createInt4(i),
-          DatumFactory.createFloat4(123 * i),
-          DatumFactory.createInt4(100 + i) });
-      appender3.addTuple(tuple3);
-
-      int y = 20 + i;
-      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
-          DatumFactory.createText("firstname_" + y),
-          DatumFactory.createText("lastname_" + y),
-          DatumFactory.createInt4(i),
-          DatumFactory.createFloat4(123 * i),
-          DatumFactory.createInt4(100 + i) });
-      appender3.addTuple(tuple3);
-    }
-
-    for (int i = 5; i < 10; i += 2) {
-      int x = 10 + i;
-      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
-          DatumFactory.createText("firstname_" + x),
-          DatumFactory.createText("lastname_" + x),
-          DatumFactory.createInt4(i),
-          DatumFactory.createFloat4(123 * i),
-          DatumFactory.createNullDatum() });
-      appender3.addTuple(tuple3);
-    }
-
-    appender3.flush();
-    appender3.close();
-    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
-    catalog.createTable(emp3);
-
-    // ---------------------phone3 --------------------
-    // emp_id  | phone_number
-    // -----------------------------------------------
-    // this table is empty, no rows
-
-    Schema phone3Schema = new Schema();
-    phone3Schema.addColumn("emp_id", Type.INT4);
-    phone3Schema.addColumn("phone_number", Type.TEXT);
-
-
-    TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
-    Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
-        .getAppender(phone3Meta, phone3Schema, phone3Path);
-    appender5.init();
-    
-    appender5.flush();
-    appender5.close();
-    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
-    catalog.createTable(phone3);
-
-    analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
-
-    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    util.shutdownCatalogCluster();
-  }
-  
-  String[] QUERIES = {
-      "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id", //0 no nulls
-      "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id", //1 nulls on the right operand
-      "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id", //2 nulls on the left side
-      "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id", //3 one operand is empty
-      "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id" //4 one operand is empty
-  };
-
-  @Test
-  public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
-    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
-        Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
-        Integer.MAX_VALUE);
-
-    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
-
-    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuterNLJoinExec0");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr context =  analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
-
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-
-    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
-    ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = proj.getChild();
-      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
-      proj.setChild(aJoin);
-      exec = proj;
-    }
-
-    int count = 0;
-    exec.init();
-    while (exec.next() != null) {
-       //TODO check contents
-         count = count + 1;
-    }
-    assertNull(exec.next());
-    exec.close();
-    assertEquals(12, count);
-  }
-
-
-  @Test
-  public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
-    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
-        Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
-        Integer.MAX_VALUE);
-
-    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
-
-
-    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuter_NLJoinExec1");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr context =  analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
-
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-    
-    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
-    ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = proj.getChild();
-      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
-      proj.setChild(aJoin);
-      exec = proj;
-     
-    }
-
-
-    Tuple tuple;
-    int i = 1;
-    int count = 0;
-    exec.init();
-    while ((tuple = exec.next()) != null) {
-       //TODO check contents
-         count = count + 1;
-      
-    }
-    exec.close();
-    assertEquals(5, count);
-  }
-
-  @Test
-  public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
-        Integer.MAX_VALUE);
-    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
-        Integer.MAX_VALUE);
-
-    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
-
-    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuter_NLJoinExec2");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr context =  analyzer.parse(QUERIES[2]);
-    LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
-
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-    
-    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
-    ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = proj.getChild();
-      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
-      proj.setChild(aJoin);
-      exec = proj;
-     
-    }
-
-
-    Tuple tuple;
-    int i = 1;
-    int count = 0;
-    exec.init();
-    while ((tuple = exec.next()) != null) {
-       //TODO check contents
-         count = count + 1;
-      
-    }
-    exec.close();
-    assertEquals(7, count);
-  }
-
-
-  @Test
-  public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
-        Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
-        Integer.MAX_VALUE);
-
-    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
-
-    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuter_NLJoinExec3");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr context =  analyzer.parse(QUERIES[3]);
-    LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
-
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-    
-    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
-    ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = proj.getChild();
-      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
-      proj.setChild(aJoin);
-      exec = proj;
-     
-    }
-
-
-    Tuple tuple;
-    int i = 1;
-    int count = 0;
-    exec.init();
-    while ((tuple = exec.next()) != null) {
-       //TODO check contents
-         count = count + 1;
-      
-    }
-    exec.close();
-    assertEquals(7, count);
-  }
-
-    @Test
-  public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
-        Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
-        Integer.MAX_VALUE);
-
-    FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
-
-    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuter_NLJoinExec4");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr context =  analyzer.parse(QUERIES[4]);
-    LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
-
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-    
-    //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
-    ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = proj.getChild();
-      NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
-      proj.setChild(aJoin);
-      exec = proj;
-     
-    }
-
-
-    Tuple tuple;
-    int i = 1;
-    int count = 0;
-    exec.init();
-    while ((tuple = exec.next()) != null) {
-       //TODO check contents
-         count = count + 1;
-      
-    }
-    exec.close();
-    assertEquals(0, count);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinFilterOfRowPreservedTable1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinFilterOfRowPreservedTable1.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinFilterOfRowPreservedTable1.sql
index 66274d7..50ea371 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinFilterOfRowPreservedTable1.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinFilterOfRowPreservedTable1.sql
@@ -5,4 +5,4 @@ select
   n_regionkey
 from
   region left outer join nation on n_regionkey = r_regionkey and r_name in ('AMERICA', 'ASIA')
-order by r_name;
\ No newline at end of file
+order by r_name,n_name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/test/resources/results/TestJoinQuery/testJoinFilterOfRowPreservedTable1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testJoinFilterOfRowPreservedTable1.result b/tajo-core/src/test/resources/results/TestJoinQuery/testJoinFilterOfRowPreservedTable1.result
index 82d5562..d489e3e 100644
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testJoinFilterOfRowPreservedTable1.result
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testJoinFilterOfRowPreservedTable1.result
@@ -6,10 +6,10 @@ AMERICA,1,BRAZIL,1
 AMERICA,1,CANADA,1
 AMERICA,1,PERU,1
 AMERICA,1,UNITED STATES,1
+ASIA,2,CHINA,2
 ASIA,2,INDIA,2
 ASIA,2,INDONESIA,2
 ASIA,2,JAPAN,2
-ASIA,2,CHINA,2
 ASIA,2,VIETNAM,2
 EUROPE,3,null,null
 MIDDLE EAST,4,null,null
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
index cfcc829..33d6565 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
@@ -78,7 +78,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
   }
 
   public void merge(FunctionContext context, Tuple tuple) {
-    if (!isBinded) {
+    if (!isBound) {
       throw new IllegalStateException("bind() must be called before merge()");
     }
     mergeParam(context, evalParams(tuple));
@@ -99,7 +99,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
   }
 
   public Datum terminate(FunctionContext context) {
-    if (!isBinded) {
+    if (!isBound) {
       throw new IllegalStateException("bind() must be called before terminate()");
     }
     if (!finalPhase) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
index 6cf7272..c6b7354 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
@@ -21,6 +21,7 @@ package org.apache.tajo.plan.expr;
 import org.apache.tajo.catalog.Column;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
@@ -327,6 +328,10 @@ public class AlgebraicUtil {
         (expr.getType() == EvalType.LIKE && !((LikePredicateEval)expr).isLeadingWildCard());
   }
 
+  public static EvalNode createSingletonExprFromCNF(Collection<EvalNode> cnfExprs) {
+    return createSingletonExprFromCNF(cnfExprs.toArray(new EvalNode[cnfExprs.size()]));
+  }
+
   /**
    * Convert a list of conjunctive normal forms into a singleton expression.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
index 9abb0bc..b154532 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
@@ -35,43 +35,44 @@ import org.apache.tajo.storage.Tuple;
  * It is also used for evaluation.
  */
 public abstract class EvalNode implements Cloneable, GsonObject, ProtoObject<PlanProto.EvalNodeTree> {
-	@Expose protected EvalType type;
-  protected boolean isBinded = false;
+  @Expose
+  protected EvalType type;
+  protected transient boolean isBound;
 
   public EvalNode() {
   }
 
-	public EvalNode(EvalType type) {
-		this.type = type;
-	}
-	
-	public EvalType getType() {
-		return this.type;
-	}
-	
-	public abstract DataType getValueType();
+  public EvalNode(EvalType type) {
+    this.type = type;
+  }
+
+  public EvalType getType() {
+    return this.type;
+  }
+
+  public abstract DataType getValueType();
 
   public abstract int childNum();
 
   public abstract EvalNode getChild(int idx);
-	
-	public abstract String getName();
+
+  public abstract String getName();
 
   @Override
-	public String toJson() {
+  public String toJson() {
     return PlanGsonHelper.toJson(this, EvalNode.class);
-	}
+  }
 
   public EvalNode bind(@Nullable EvalContext evalContext, Schema schema) {
     for (int i = 0; i < childNum(); i++) {
       getChild(i).bind(evalContext, schema);
     }
-    isBinded = true;
+    isBound = true;
     return this;
   }
 
-	public <T extends Datum> T eval(Tuple tuple) {
-    if (!isBinded) {
+  public <T extends Datum> T eval(Tuple tuple) {
+    if (!isBound) {
       throw new IllegalStateException("bind() must be called before eval()");
     }
     return null;
@@ -87,7 +88,7 @@ public abstract class EvalNode implements Cloneable, GsonObject, ProtoObject<Pla
   public Object clone() throws CloneNotSupportedException {
     EvalNode evalNode = (EvalNode) super.clone();
     evalNode.type = type;
-    evalNode.isBinded = isBinded;
+    evalNode.isBound = isBound;
     return evalNode;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-plan/src/main/java/org/apache/tajo/plan/expr/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/InEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/InEval.java
index c968bda..7052663 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/InEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/InEval.java
@@ -58,7 +58,7 @@ public class InEval extends BinaryEval {
 
   @Override
   public Datum eval(Tuple tuple) {
-    if (!isBinded) {
+    if (!isBound) {
       throw new IllegalStateException("bind() must be called before eval()");
     }
     if (values == null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
index cdd8dfb..ec143f7 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
@@ -82,7 +82,7 @@ public abstract class PatternMatchPredicateEval extends BinaryEval {
 
   @Override
   public Datum eval(Tuple tuple) {
-    if (!isBinded) {
+    if (!isBound) {
       throw new IllegalStateException("bind() must be called before eval()");
     }
     Datum predicand = leftExpr.eval(tuple);

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
index a39d303..e5b88f2 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
@@ -64,7 +64,7 @@ public class WindowFunctionEval extends AggregationFunctionCallEval implements C
 
   @Override
   public Datum terminate(FunctionContext context) {
-    if (!isBinded) {
+    if (!isBound) {
       throw new IllegalStateException("bind() must be called before terminate()");
     }
     return functionInvoke.terminate(context);

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
index 8b7e2e0..a5561ed 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -30,7 +30,7 @@ import org.apache.tajo.exception.UnsupportedException;
 /**
  * An instance of FrameTuple is an immutable tuple.
  * It contains two tuples and pretends to be one instance of Tuple for
- * join qual evaluatations.
+ * join qual evaluations.
  */
 public class FrameTuple implements Tuple, Cloneable {
   private int size;
@@ -52,6 +52,18 @@ public class FrameTuple implements Tuple, Cloneable {
     this.right = right;
   }
 
+  public FrameTuple setLeft(Tuple left) {
+    this.left = left;
+    this.leftSize = left.size();
+    return this;
+  }
+
+  public FrameTuple setRight(Tuple right) {
+    this.right = right;
+    this.size = leftSize + right.size();
+    return this;
+  }
+
   @Override
   public int size() {
     return size;


[2/2] tajo git commit: TAJO-1542 Refactoring of HashJoinExecs. (contributed by navis, committed by hyunsik)

Posted by hy...@apache.org.
TAJO-1542 Refactoring of HashJoinExecs. (contributed by navis, committed by hyunsik)

Closes #529 #567


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/36a703c5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/36a703c5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/36a703c5

Branch: refs/heads/master
Commit: 36a703c5dc2c2257dfd52232f204507fb4b79024
Parents: f3acbdf
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu May 14 20:59:09 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu May 14 20:59:09 2015 -0700

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/catalog/Schema.java    |  16 +
 .../org/apache/tajo/storage/EmptyTuple.java     | 140 +-----
 .../java/org/apache/tajo/storage/NullTuple.java | 175 +++++++
 .../java/org/apache/tajo/storage/VTuple.java    |  20 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  22 +-
 .../physical/BasicPhysicalExecutorVisitor.java  |   8 -
 .../planner/physical/CommonHashJoinExec.java    | 191 ++++++++
 .../engine/planner/physical/CommonJoinExec.java | 172 ++++++-
 .../planner/physical/HashFullOuterJoinExec.java | 247 ++++------
 .../engine/planner/physical/HashJoinExec.java   | 212 +--------
 .../planner/physical/HashLeftAntiJoinExec.java  |  59 +--
 .../planner/physical/HashLeftOuterJoinExec.java | 292 +-----------
 .../planner/physical/HashLeftSemiJoinExec.java  |  48 +-
 .../planner/physical/NLLeftOuterJoinExec.java   | 101 ----
 .../physical/PhysicalExecutorVisitor.java       |   3 -
 .../physical/RightOuterMergeJoinExec.java       |  40 +-
 .../apache/tajo/engine/utils/CacheHolder.java   |   3 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   8 +-
 .../physical/TestLeftOuterHashJoinExec.java     | 104 ++--
 .../physical/TestLeftOuterNLJoinExec.java       | 474 -------------------
 .../testJoinFilterOfRowPreservedTable1.sql      |   2 +-
 .../testJoinFilterOfRowPreservedTable1.result   |   2 +-
 .../plan/expr/AggregationFunctionCallEval.java  |   4 +-
 .../apache/tajo/plan/expr/AlgebraicUtil.java    |   5 +
 .../org/apache/tajo/plan/expr/EvalNode.java     |  39 +-
 .../java/org/apache/tajo/plan/expr/InEval.java  |   2 +-
 .../plan/expr/PatternMatchPredicateEval.java    |   2 +-
 .../tajo/plan/expr/WindowFunctionEval.java      |   2 +-
 .../org/apache/tajo/storage/FrameTuple.java     |  14 +-
 30 files changed, 842 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 77be589..44ae4b4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1542: Refactoring of HashJoinExecs. (Contributed Navis, Committed by 
+    hyunsik)
+
     TAJO-1591: Change StoreType represented as Enum to String type. (hyunsik)
 
     TAJO-1452: Improve function listing order (Contributed Dongjoon Hyun, 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 0e4b741..80c4d83 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -400,6 +400,22 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
     return containFlag;
   }
 
+  /**
+   * Return TRUE if any column in <code>columns</code> is included in this schema.
+   *
+   * @param columns Columns to be checked
+   * @return true if any column in <code>columns</code> is included in this schema.
+   *         Otherwise, false.
+   */
+  public boolean containsAny(Collection<Column> columns) {
+    for (Column column : columns) {
+      if (contains(column)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public synchronized Schema addColumn(String name, TypeDesc typeDesc) {
     String normalized = name;
     if(fieldsByQualifiedName.containsKey(normalized)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
index 89e72ed..cdcebd7 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java
@@ -18,17 +18,12 @@
 
 package org.apache.tajo.storage;
 
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-
 /* This class doesn’t have content datum. if selected column is zero, this is useful
 *  e.g. select count(*) from table
 * */
-public class EmptyTuple implements Tuple, Cloneable {
+public class EmptyTuple extends NullTuple {
 
   private static EmptyTuple tuple;
-  private static Datum[] EMPTY_VALUES = new Datum[0];
 
   static {
     tuple = new EmptyTuple();
@@ -39,138 +34,11 @@ public class EmptyTuple implements Tuple, Cloneable {
   }
 
   private EmptyTuple() {
+    super(0);
   }
 
   @Override
-  public int size() {
-    return 0;
-  }
-
-  public boolean contains(int fieldId) {
-    return false;
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return true;
-  }
-
-  @Override
-  public boolean isNotNull(int fieldid) {
-    return false;
-  }
-
-  @Override
-  public void clear() {
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Datum get(int fieldId) {
-    return NullDatum.get();
-  }
-
-  @Override
-  public void setOffset(long offset) {
-
-  }
-
-  @Override
-  public long getOffset() {
-    return -1;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return NullDatum.get().asBool();
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return NullDatum.get().asByte();
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return NullDatum.get().asChar();
-  }
-
-  @Override
-  public byte[] getBytes(int fieldId) {
-    return NullDatum.get().asByteArray();
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    return NullDatum.get().asInt2();
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return NullDatum.get().asInt4();
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return NullDatum.get().asInt8();
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return NullDatum.get().asFloat4();
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return NullDatum.get().asFloat8();
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    return NullDatum.get().asChars();
-  }
-
-  @Override
-  public ProtobufDatum getProtobufDatum(int fieldId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Datum getInterval(int fieldId) {
-    return NullDatum.get();
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    return NullDatum.get().asUnicodeChars();
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    throw new CloneNotSupportedException();
-  }
-
-  @Override
-  public Datum[] getValues() {
-    return EMPTY_VALUES;
+  public Tuple clone() {
+    return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
new file mode 100644
index 0000000..45eb859
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+import java.util.Arrays;
+
+/**
+ * A tuple which contains all null datums. It is used for outer joins.
+ */
+public class NullTuple implements Tuple, Cloneable {
+
+  public static Tuple create(int size) {
+    return new NullTuple(size);
+  }
+
+  private final int size;
+
+  NullTuple(int size) {
+    this.size = size;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  public boolean contains(int fieldId) {
+    return fieldId < size;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return true;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return false;
+  }
+
+  @Override
+  public void clear() {
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    return NullDatum.get();
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return NullDatum.get().asBool();
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return NullDatum.get().asByte();
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return NullDatum.get().asChar();
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    return NullDatum.get().asByteArray();
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    return NullDatum.get().asInt2();
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return NullDatum.get().asInt4();
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return NullDatum.get().asInt8();
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return NullDatum.get().asFloat4();
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return NullDatum.get().asFloat8();
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return NullDatum.get().asChars();
+  }
+
+  @Override
+  public ProtobufDatum getProtobufDatum(int fieldId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Datum getInterval(int fieldId) {
+    return NullDatum.get();
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    return NullDatum.get().asUnicodeChars();
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return new NullTuple(size);
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum[] datum = new Datum[size];
+    Arrays.fill(datum, NullDatum.get());
+    return datum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
index 5e839b7..da69eb0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -201,6 +201,7 @@ public class VTuple implements Tuple, Cloneable {
     return tuple;
   }
 
+  @Override
   public String toString() {
 		return toDisplayString(getValues());
 	}
@@ -225,22 +226,15 @@ public class VTuple implements Tuple, Cloneable {
   }
 
   public static String toDisplayString(Datum [] values) {
-    boolean first = true;
     StringBuilder str = new StringBuilder();
-    str.append("(");
-    for(int i=0; i < values.length; i++) {
-      if(values[i] != null) {
-        if(first) {
-          first = false;
-        } else {
-          str.append(", ");
-        }
-        str.append(i)
-            .append("=>")
-            .append(values[i]);
+    str.append('(');
+    for (Datum datum : values) {
+      if (str.length() > 1) {
+        str.append(',');
       }
+      str.append(datum);
     }
-    str.append(")");
+    str.append(')');
     return str.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 506b03e..978dde8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -466,14 +466,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         case IN_MEMORY_HASH_JOIN:
           LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
           return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
-        case NESTED_LOOP_JOIN:
-          //the right operand is too large, so we opt for NL implementation of left outer join
-          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-          return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+        case MERGE_JOIN:
+          //the right operand is too large, so we opt for merge join implementation
+          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+          return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
         default:
           LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
-          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+          LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
+          return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
       }
     } else {
       return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
@@ -500,9 +500,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
     }
     else {
-      //the right operand is too large, so we opt for NL implementation of left outer join
-      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-      return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+      //the right operand is too large, so we opt for merge join implementation
+      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+      return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
     }
   }
 
@@ -566,7 +566,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
         default:
           LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
-          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
           return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
       }
     } else {
@@ -589,7 +589,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
         default:
           LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
-          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
           return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index 42611b0..c2d93bb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -65,8 +65,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
       return visitMergeJoin(context, (MergeJoinExec) exec, stack);
     } else if (exec instanceof NLJoinExec) {
       return visitNLJoin(context, (NLJoinExec) exec, stack);
-    } else if (exec instanceof NLLeftOuterJoinExec) {
-      return visitNLLeftOuterJoin(context, (NLLeftOuterJoinExec) exec, stack);
     } else if (exec instanceof ProjectionExec) {
       return visitProjection(context, (ProjectionExec) exec, stack);
     } else if (exec instanceof RangeShuffleFileWriteExec) {
@@ -214,12 +212,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
   }
 
   @Override
-  public RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
-      throws PhysicalPlanningException {
-    return visitBinaryExecutor(context, exec, stack);
-  }
-
-  @Override
   public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
     return visitUnaryExecutor(context, exec, stack);

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
new file mode 100644
index 0000000..ff9b253
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCacheKey;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * common exec for all hash join execs
+ *
+ * @param <T> Tuple collection type to load small relation onto in-memory
+ */
+public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
+
+  protected final List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected Map<Tuple, T> tupleSlots;
+
+  protected Iterator<Tuple> iterator;
+
+  protected final Tuple keyTuple;
+
+  protected final int rightNumCols;
+  protected final int leftNumCols;
+
+  protected final int[] leftKeyList;
+  protected final int[] rightKeyList;
+
+  protected boolean finished;
+
+  public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
+    super(context, plan, outer, inner);
+
+    // HashJoin only can manage equi join key pairs.
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(),
+        inner.getSchema(), false);
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    leftNumCols = outer.getSchema().size();
+    rightNumCols = inner.getSchema().size();
+
+    keyTuple = new VTuple(leftKeyList.length);
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
+    if (scanExec.canBroadcast()) {
+      /* If this table can broadcast, all tasks in a node will share the same cache */
+      TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
+          context, scanExec.getCanonicalName(), scanExec.getFragments());
+      loadRightFromCache(key);
+    } else {
+      this.tupleSlots = convert(buildRightToHashTable(), false);
+    }
+
+    first = false;
+  }
+
+  protected void loadRightFromCache(TableCacheKey key) throws IOException {
+    ExecutionBlockSharedResource sharedResource = context.getSharedResource();
+
+    CacheHolder<Map<Tuple, List<Tuple>>> holder;
+    synchronized (sharedResource.getLock()) {
+      if (sharedResource.hasBroadcastCache(key)) {
+        holder = sharedResource.getBroadcastCache(key);
+      } else {
+        Map<Tuple, List<Tuple>> built = buildRightToHashTable();
+        holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), null);
+        sharedResource.addBroadcastCache(key, holder);
+      }
+    }
+    this.tupleSlots = convert(holder.getData(), true);
+  }
+
+  protected Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
+    Tuple tuple;
+    Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
+
+    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
+      Tuple keyTuple = new VTuple(joinKeyPairs.size());
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      List<Tuple> newValue = map.get(keyTuple);
+      if (newValue == null) {
+        map.put(keyTuple, newValue = new ArrayList<Tuple>());
+      }
+      // if source is scan or groupby, it needs not to be cloned
+      newValue.add(new VTuple(tuple));
+    }
+    return map;
+  }
+
+  // todo: convert loaded data to cache condition
+  protected abstract Map<Tuple, T> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache)
+      throws IOException;
+
+  protected Tuple toKey(final Tuple outerTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+    return keyTuple;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    finished = false;
+    iterator = null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    iterator = null;
+    if (tupleSlots != null) {
+      tupleSlots.clear();
+      tupleSlots = null;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (leftChild == null) {
+      return inputStats;
+    }
+    TableStats leftInputStats = leftChild.getInputStats();
+    inputStats.setNumBytes(0);
+    inputStats.setReadBytes(0);
+    inputStats.setNumRows(0);
+
+    if (leftInputStats != null) {
+      inputStats.setNumBytes(leftInputStats.getNumBytes());
+      inputStats.setReadBytes(leftInputStats.getReadBytes());
+      inputStats.setNumRows(leftInputStats.getNumRows());
+    }
+
+    TableStats rightInputStats = rightChild.getInputStats();
+    if (rightInputStats != null) {
+      inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+      inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+      inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+    }
+
+    return inputStats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
index 2535edf..ec29085 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
@@ -18,36 +18,178 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.plan.expr.AlgebraicUtil;
+import org.apache.tajo.plan.expr.BinaryEval;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
 import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.NullTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
 
-// common join exec except HashLeftOuterJoinExec
+/**
+ * common exec for all join execs
+ */
 public abstract class CommonJoinExec extends BinaryPhysicalExec {
 
   // from logical plan
   protected JoinNode plan;
   protected final boolean hasJoinQual;
 
-  protected EvalNode joinQual;
+  protected EvalNode joinQual;         // ex) a.id = b.id
+  protected EvalNode leftJoinFilter;   // ex) a > 10
+  protected EvalNode rightJoinFilter;  // ex) b > 5
+
+  protected final Schema leftSchema;
+  protected final Schema rightSchema;
+
+  protected final FrameTuple frameTuple;
+  protected final Tuple outTuple;
 
   // projection
   protected Projector projector;
 
-  public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
-                        PhysicalExec inner) {
+  public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
     super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
         plan.getOutSchema(), outer, inner);
     this.plan = plan;
-    this.joinQual = plan.getJoinQual();
-    this.hasJoinQual = plan.hasJoinQual();
+    this.leftSchema = outer.getSchema();
+    this.rightSchema = inner.getSchema();
+    if (plan.hasJoinQual()) {
+      EvalNode[] extracted = extractJoinConditions(plan.getJoinQual(), leftSchema, rightSchema);
+      joinQual = extracted[0];
+      leftJoinFilter = extracted[1];
+      rightJoinFilter = extracted[2];
+    }
+    this.hasJoinQual = joinQual != null;
 
     // for projection
     this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+
+    // for join
+    this.frameTuple = new FrameTuple();
+    this.outTuple = new VTuple(outSchema.size());
+  }
+
+  /**
+   * It separates a singular CNF-formed join condition into a join condition, a left join filter, and
+   * right join filter.
+   *
+   * @param joinQual the original join condition
+   * @param leftSchema Left table schema
+   * @param rightSchema Left table schema
+   * @return Three element EvalNodes, 0 - join condition, 1 - left join filter, 2 - right join filter.
+   */
+  private EvalNode[] extractJoinConditions(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+    List<EvalNode> joinQuals = Lists.newArrayList();
+    List<EvalNode> leftFilters = Lists.newArrayList();
+    List<EvalNode> rightFilters = Lists.newArrayList();
+    for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(joinQual)) {
+      if (!(eachQual instanceof BinaryEval)) {
+        continue; // todo 'between', etc.
+      }
+      BinaryEval binaryEval = (BinaryEval)eachQual;
+      LinkedHashSet<Column> leftColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getLeftExpr());
+      LinkedHashSet<Column> rightColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getRightExpr());
+      boolean leftInLeft = leftSchema.containsAny(leftColumns);
+      boolean rightInLeft = leftSchema.containsAny(rightColumns);
+      boolean leftInRight = rightSchema.containsAny(leftColumns);
+      boolean rightInRight = rightSchema.containsAny(rightColumns);
+
+      boolean columnsFromLeft = leftInLeft || rightInLeft;
+      boolean columnsFromRight = leftInRight || rightInRight;
+      if (!columnsFromLeft && !columnsFromRight) {
+        continue; // todo constant expression : this should be done in logical phase
+      }
+      if (columnsFromLeft ^ columnsFromRight) {
+        if (columnsFromLeft) {
+          leftFilters.add(eachQual);
+        } else {
+          rightFilters.add(eachQual);
+        }
+        continue;
+      }
+      if ((leftInLeft && rightInLeft) || (leftInRight && rightInRight)) {
+        continue; // todo not allowed yet : this should be checked in logical phase
+      }
+      joinQuals.add(eachQual);
+    }
+    return new EvalNode[] {
+        joinQuals.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinQuals),
+        leftFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(leftFilters),
+        rightFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(rightFilters)
+    };
+  }
+
+  public JoinNode getPlan() {
+    return plan;
+  }
+
+  /**
+   * Evaluate an input tuple with a left join filter
+   *
+   * @param left Tuple to be evaluated
+   * @return True if an input tuple is matched to the left join filter
+   */
+  protected boolean leftFiltered(Tuple left) {
+    return leftJoinFilter != null && !leftJoinFilter.eval(left).asBool();
+  }
+
+  /**
+   * Evaluate an input tuple with a right join filter
+   *
+   * @param right Tuple to be evaluated
+   * @return True if an input tuple is matched to the right join filter
+   */
+  protected boolean rightFiltered(Tuple right) {
+    return rightJoinFilter != null && !rightJoinFilter.eval(right).asBool();
+  }
+
+  /**
+   * Return an tuple iterator filters rows in a right table by using a join filter.
+   * It must takes rows of a right table.
+   *
+   * @param rightTuples Tuple iterator
+   * @return rows Filtered by a join filter on right table.
+   */
+  protected Iterator<Tuple> rightFiltered(Iterable<Tuple> rightTuples) {
+    if (rightTuples == null) {
+      return Iterators.emptyIterator();
+    }
+    if (rightJoinFilter == null) {
+      return rightTuples.iterator();
+    }
+    return Iterators.filter(rightTuples.iterator(), new Predicate<Tuple>() {
+      @Override
+      public boolean apply(Tuple input) {
+        return rightJoinFilter.eval(input).asBool();
+      }
+    });
+  }
+
+  /**
+   * Return an tuple iterator, containing a single NullTuple
+   *
+   * @param width the width of tuple
+   * @return an tuple iterator, containing a single NullTuple
+   */
+  protected Iterator<Tuple> nullIterator(int width) {
+    return Arrays.asList(NullTuple.create(width)).iterator();
   }
 
   @Override
@@ -56,6 +198,12 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
     if (hasJoinQual) {
       joinQual.bind(context.getEvalContext(), inSchema);
     }
+    if (leftJoinFilter != null) {
+      leftJoinFilter.bind(context.getEvalContext(), leftSchema);
+    }
+    if (rightJoinFilter != null) {
+      rightJoinFilter.bind(context.getEvalContext(), rightSchema);
+    }
   }
 
   @Override
@@ -63,10 +211,7 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
     if (hasJoinQual) {
       joinQual = context.getPrecompiledEval(inSchema, joinQual);
     }
-  }
-
-  public JoinNode getPlan() {
-    return plan;
+    // compile filters?
   }
 
   @Override
@@ -74,6 +219,13 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
     super.close();
     plan = null;
     joinQual = null;
+    leftJoinFilter = null;
+    rightJoinFilter = null;
     projector = null;
   }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + " [" + leftSchema + " : " + rightSchema + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 6e28ae0..1645263 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -18,101 +18,59 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.NullTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
 
+public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List<Tuple>>> {
 
-public class HashFullOuterJoinExec extends CommonJoinExec {
-
-  protected List<Column[]> joinKeyPairs;
-
-  // temporal tuples and states for nested loop join
-  protected boolean first = true;
-  protected FrameTuple frameTuple;
-  protected Tuple outTuple = null;
-  protected Map<Tuple, List<Tuple>> tupleSlots;
-  protected Iterator<Tuple> iterator = null;
-  protected Tuple leftTuple;
-  protected Tuple leftKeyTuple;
-
-  protected int [] leftKeyList;
-  protected int [] rightKeyList;
-
-  protected boolean finished = false;
-  protected boolean shouldGetLeftTuple = true;
-
-  private int rightNumCols;
-  private int leftNumCols;
-  private Map<Tuple, Boolean> matched;
+  private boolean finalLoop; // final loop for right unmatched
 
   public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
                                PhysicalExec inner) {
     super(context, plan, outer, inner);
-    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
-    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
-    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
-    this.matched = new HashMap<Tuple, Boolean>(10000);
-
-    // HashJoin only can manage equi join key pairs.
-    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), inner.getSchema(),
-        false);
-
-    leftKeyList = new int[joinKeyPairs.size()];
-    rightKeyList = new int[joinKeyPairs.size()];
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
-    }
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
-    }
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
-    leftKeyTuple = new VTuple(leftKeyList.length);
-
-    leftNumCols = outer.getSchema().size();
-    rightNumCols = inner.getSchema().size();
   }
 
-  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
-    for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
-    }
-  }
+  public Iterator<Tuple> getUnmatchedRight() {
 
-  public Tuple getNextUnmatchedRight() {
+    return new Iterator<Tuple>() {
 
-    List<Tuple> newValue;
-    Tuple returnedTuple;
-    // get a keyTUple from the matched hashmap with a boolean false value
-    for(Tuple aKeyTuple : matched.keySet()) {
-      if(matched.get(aKeyTuple) == false) {
-        newValue = tupleSlots.get(aKeyTuple);
-        returnedTuple = newValue.remove(0);
-        tupleSlots.put(aKeyTuple, newValue);
+      private Iterator<Pair<Boolean, List<Tuple>>> iterator1 = tupleSlots.values().iterator();
+      private Iterator<Tuple> iterator2;
 
-        // after taking the last element from the list in tupleSlots, set flag true in matched as well
-        if(newValue.isEmpty()){
-          matched.put(aKeyTuple, true);
+      @Override
+      public boolean hasNext() {
+        if (hasMore()) {
+          return true;
         }
+        for (iterator2 = null; !hasMore() && iterator1.hasNext();) {
+          Pair<Boolean, List<Tuple>> next = iterator1.next();
+          if (!next.getFirst()) {
+            iterator2 = next.getSecond().iterator();
+          }
+        }
+        return hasMore();
+      }
 
-        return returnedTuple;
+      private boolean hasMore() {
+        return iterator2 != null && iterator2.hasNext();
       }
-    }
-    return null;
+
+      @Override
+      public Tuple next() {
+        return iterator2.next();
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("remove");
+      }
+    };
   }
 
   public Tuple next() throws IOException {
@@ -120,112 +78,67 @@ public class HashFullOuterJoinExec extends CommonJoinExec {
       loadRightToHashTable();
     }
 
-    Tuple rightTuple;
-    boolean found = false;
-
-    while(!context.isStopped() && !finished) {
-      if (shouldGetLeftTuple) { // initially, it is true.
-        // getting new outer
-        leftTuple = leftChild.next(); // it comes from a disk
-        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
-          Tuple unmatchedRightTuple = getNextUnmatchedRight();
-          if( unmatchedRightTuple == null) {
-            finished = true;
-            outTuple = null;
-            return null;
-          } else {
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
-            projector.eval(frameTuple, outTuple);
-
-            return outTuple;
-          }
-        }
-
-        // getting corresponding right
-        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
-        List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
-        if (rightTuples != null) { // found right tuples on in-memory hash table.
-          iterator = rightTuples.iterator();
-          shouldGetLeftTuple = false;
-        } else {
-          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
-          //output a tuple with the nulls padded rightTuple
-          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-          frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(frameTuple, outTuple);
-          // we simulate we found a match, which is exactly the null padded one
-          shouldGetLeftTuple = true;
-          return outTuple;
-        }
-      }
-
-      // getting a next right tuple on in-memory hash table.
-      rightTuple = iterator.next();
-      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-
-      if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable
+    while (!context.isStopped() && !finished) {
+      if (iterator != null && iterator.hasNext()) {
+        frameTuple.setRight(iterator.next());
         projector.eval(frameTuple, outTuple);
-        found = true;
-        getKeyLeftTuple(leftTuple, leftKeyTuple);
-        matched.put(leftKeyTuple, true);
+        return outTuple;
       }
-
-      if (!iterator.hasNext()) { // no more right tuples for this hash key
-        shouldGetLeftTuple = true;
+      if (finalLoop) {
+        finished = true;
+        return null;
       }
-
-      if (found) {
-        break;
+      Tuple leftTuple = leftChild.next();
+      if (leftTuple == null) {
+      // if no more tuples in left tuples, a join is completed.
+        // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+        frameTuple.setLeft(NullTuple.create(leftNumCols));
+        iterator = getUnmatchedRight();
+        finalLoop = true;
+        continue;
       }
-    }
-    return outTuple;
-  }
-
-  protected void loadRightToHashTable() throws IOException {
-    Tuple tuple;
-    Tuple keyTuple;
+      frameTuple.setLeft(leftTuple);
 
-    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
-      keyTuple = new VTuple(joinKeyPairs.size());
-      for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      if (leftFiltered(leftTuple)) {
+        iterator = nullIterator(rightNumCols);
+        continue;
       }
-
-      List<Tuple> newValue = tupleSlots.get(keyTuple);
-      if (newValue != null) {
-        newValue.add(tuple);
-      } else {
-        newValue = new ArrayList<Tuple>();
-        newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
-        matched.put(keyTuple,false);
+      // getting corresponding right
+      Pair<Boolean, List<Tuple>> hashed = tupleSlots.get(toKey(leftTuple));
+      if (hashed == null) {
+        iterator = nullIterator(rightNumCols);
+        continue;
+      }
+      Iterator<Tuple> rightTuples = rightFiltered(hashed.getSecond());
+      if (!rightTuples.hasNext()) {
+        iterator = nullIterator(rightNumCols);
+        continue;
       }
+      iterator = rightTuples;
+      hashed.setFirst(true);   // match found
     }
-    first = false;
+
+    return null;
   }
 
   @Override
-  public void rescan() throws IOException {
-    super.rescan();
-
-    tupleSlots.clear();
-    first = true;
-
-    finished = false;
-    iterator = null;
-    shouldGetLeftTuple = true;
+  protected Map<Tuple, Pair<Boolean, List<Tuple>>> convert(Map<Tuple, List<Tuple>> hashed,
+                                                           boolean fromCache) throws IOException {
+    Map<Tuple, Pair<Boolean, List<Tuple>>> tuples = new HashMap<Tuple, Pair<Boolean, List<Tuple>>>(hashed.size());
+    for (Map.Entry<Tuple, List<Tuple>> entry : hashed.entrySet()) {
+      // flag: initially false (whether this join key had at least one match on the counter part)
+      tuples.put(entry.getKey(), new Pair<Boolean, List<Tuple>>(false, entry.getValue()));
+    }
+    return tuples;
   }
 
   @Override
-  public void close() throws IOException {
-    super.close();
-    tupleSlots.clear();
-    matched.clear();
-    tupleSlots = null;
-    matched = null;
-    iterator = null;
+  public void rescan() throws IOException {
+    super.rescan();
+    for (Pair<Boolean, List<Tuple>> value : tupleSlots.values()) {
+      value.setFirst(false);
+    }
+    finalLoop = false;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 48f3682..a4215fa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -18,225 +18,59 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.utils.CacheHolder;
-import org.apache.tajo.engine.utils.TableCacheKey;
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.ExecutionBlockSharedResource;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
 
-public class HashJoinExec extends CommonJoinExec {
-
-  protected List<Column[]> joinKeyPairs;
-
-  // temporal tuples and states for nested loop join
-  protected boolean first = true;
-  protected FrameTuple frameTuple;
-  protected Tuple outTuple = null;
-  protected Map<Tuple, List<Tuple>> tupleSlots;
-  protected Iterator<Tuple> iterator = null;
-  protected Tuple leftTuple;
-  protected Tuple leftKeyTuple;
-
-  protected int [] leftKeyList;
-  protected int [] rightKeyList;
-
-  protected boolean finished = false;
-  protected boolean shouldGetLeftTuple = true;
-
-  private TableStats cachedRightTableStats;
+public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
 
   public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
       PhysicalExec rightExec) {
     super(context, plan, leftExec, rightExec);
-
-    // HashJoin only can manage equi join key pairs.
-    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
-        rightExec.getSchema(), false);
-
-    leftKeyList = new int[joinKeyPairs.size()];
-    rightKeyList = new int[joinKeyPairs.size()];
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
-    }
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
-    }
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
-    leftKeyTuple = new VTuple(leftKeyList.length);
   }
 
-  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
-    for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
-    }
+  @Override
+  protected Map<Tuple, List<Tuple>> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache)
+      throws IOException {
+    return fromCache ? new HashMap<Tuple, List<Tuple>>(hashed) : hashed;
   }
 
+  @Override
   public Tuple next() throws IOException {
     if (first) {
       loadRightToHashTable();
     }
 
-    Tuple rightTuple;
-    boolean found = false;
-
-    while(!context.isStopped() && !finished) {
-      if (shouldGetLeftTuple) { // initially, it is true.
-        // getting new outer
-        leftTuple = leftChild.next(); // it comes from a disk
-        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-          finished = true;
-          return null;
-        }
-
-        // getting corresponding right
-        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
-        List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
-        if (rightTuples != null) { // found right tuples on in-memory hash table.
-          iterator = rightTuples.iterator();
-          shouldGetLeftTuple = false;
-        } else {
-          shouldGetLeftTuple = true;
-          continue;
-        }
-      }
-
-      // getting a next right tuple on in-memory hash table.
-      rightTuple = iterator.next();
-      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable
+    while (!context.isStopped() && !finished) {
+      if (iterator != null && iterator.hasNext()) {
+        frameTuple.setRight(iterator.next());
         projector.eval(frameTuple, outTuple);
-        found = true;
-      }
-
-      if (!iterator.hasNext()) { // no more right tuples for this hash key
-        shouldGetLeftTuple = true;
-      }
-
-      if (found) {
-        break;
-      }
-    }
-
-    return new VTuple(outTuple);
-  }
-
-  protected void loadRightToHashTable() throws IOException {
-    ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
-    if (scanExec.canBroadcast()) {
-      /* If this table can broadcast, all tasks in a node will share the same cache */
-      TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
-          context, scanExec.getCanonicalName(), scanExec.getFragments());
-      loadRightFromCache(key);
-    } else {
-      this.tupleSlots = buildRightToHashTable();
-    }
-
-    first = false;
-  }
-
-  protected void loadRightFromCache(TableCacheKey key) throws IOException {
-    ExecutionBlockSharedResource sharedResource = context.getSharedResource();
-    synchronized (sharedResource.getLock()) {
-      if (sharedResource.hasBroadcastCache(key)) {
-        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
-        this.tupleSlots = data.getData();
-        this.cachedRightTableStats = data.getTableStats();
-      } else {
-        CacheHolder.BroadcastCacheHolder holder =
-            new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
-        sharedResource.addBroadcastCache(key, holder);
-        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
-        this.tupleSlots = data.getData();
-        this.cachedRightTableStats = data.getTableStats();
+        return outTuple;
       }
-    }
-  }
-
-  private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
-    Tuple tuple;
-    Tuple keyTuple;
-    Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
-
-    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
-      keyTuple = new VTuple(joinKeyPairs.size());
-      for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      Tuple leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = leftTuple == null;
+        continue;
       }
 
-      List<Tuple> newValue = map.get(keyTuple);
+      frameTuple.setLeft(leftTuple);
 
-      if (newValue != null) {
-        newValue.add(tuple);
-      } else {
-        newValue = new ArrayList<Tuple>();
-        newValue.add(tuple);
-        map.put(keyTuple, newValue);
+      // getting corresponding right
+      Iterable<Tuple> hashed = getRights(toKey(leftTuple));
+      Iterator<Tuple> rightTuples = rightFiltered(hashed);
+      if (rightTuples.hasNext()) {
+        iterator = rightTuples;
       }
     }
 
-    return map;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-
-    tupleSlots.clear();
-    first = true;
-
-    finished = false;
-    iterator = null;
-    shouldGetLeftTuple = true;
+    return null;
   }
 
-  @Override
-  public void close() throws IOException {
-    super.close();
-    if (tupleSlots != null) {
-      tupleSlots.clear();
-      tupleSlots = null;
-    }
-
-    iterator = null;
+  private Iterable<Tuple> getRights(Tuple key) {
+    return tupleSlots.get(key);
   }
 
-  @Override
-  public TableStats getInputStats() {
-    if (leftChild == null) {
-      return inputStats;
-    }
-    TableStats leftInputStats = leftChild.getInputStats();
-    inputStats.setNumBytes(0);
-    inputStats.setReadBytes(0);
-    inputStats.setNumRows(0);
-
-    if (leftInputStats != null) {
-      inputStats.setNumBytes(leftInputStats.getNumBytes());
-      inputStats.setReadBytes(leftInputStats.getReadBytes());
-      inputStats.setNumRows(leftInputStats.getNumRows());
-    }
-
-    TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
-    if (rightInputStats != null) {
-      inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
-      inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
-      inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
-    }
-
-    return inputStats;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 881bf84..8239270 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -19,10 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 
 import java.io.IOException;
 import java.util.List;
@@ -33,16 +31,10 @@ import java.util.List;
  * If not found, it returns the tuple of the FROM side table with null padding.
  */
 public class HashLeftAntiJoinExec extends HashJoinExec {
-  private Tuple rightNullTuple;
 
   public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
                               PhysicalExec notInSideChild) {
     super(context, plan, fromSideChild, notInSideChild);
-    // NUll Tuple
-    rightNullTuple = new VTuple(leftChild.outColumnNum);
-    for (int i = 0; i < leftChild.outColumnNum; i++) {
-      rightNullTuple.put(i, NullDatum.get());
-    }
   }
 
   /**
@@ -56,54 +48,33 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
    * @return The tuple which is unmatched to a given join condition.
    * @throws IOException
    */
+  @Override
   public Tuple next() throws IOException {
     if (first) {
       loadRightToHashTable();
     }
 
-    Tuple rightTuple;
-    boolean notFound;
-
     while(!context.isStopped() && !finished) {
-
-      // getting new outer
-      leftTuple = leftChild.next(); // it comes from a disk
-      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-        finished = true;
-        return null;
-      }
-
-      // Try to find a hash bucket in in-memory hash table
-      getKeyLeftTuple(leftTuple, leftKeyTuple);
-      List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
-      if (rightTuples != null) {
-        // if found, it gets a hash bucket from the hash table.
-        iterator = rightTuples.iterator();
-      } else {
-        // if not found, it returns a tuple.
-        frameTuple.set(leftTuple, rightNullTuple);
+      if (iterator != null && iterator.hasNext()) {
+        frameTuple.setRight(iterator.next());
         projector.eval(frameTuple, outTuple);
         return outTuple;
       }
-
-      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
-      // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
-      notFound = true;
-      while (!context.isStopped() && notFound && iterator.hasNext()) {
-        rightTuple = iterator.next();
-        frameTuple.set(leftTuple, rightTuple);
-        if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found
-          notFound = false;
-        }
+      // getting new outer
+      Tuple leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = leftTuple == null;
+        continue;
       }
 
-      if (notFound) { // if there is no matched tuple
-        frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(frameTuple, outTuple);
-        break;
+      frameTuple.setLeft(leftTuple);
+
+      // Try to find a hash bucket in in-memory hash table
+      List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+      if (hashed == null || !rightFiltered(hashed).hasNext()) {
+        iterator = nullIterator(0);
       }
     }
-
-    return outTuple;
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 6f573d0..8613eac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -18,307 +18,61 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.utils.CacheHolder;
-import org.apache.tajo.engine.utils.TableCacheKey;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.plan.expr.AlgebraicUtil;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.expr.EvalTreeUtil;
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.ExecutionBlockSharedResource;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
 
+public class HashLeftOuterJoinExec extends HashJoinExec {
 
-public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
-  // from logical plan
-  protected JoinNode plan;
-  protected EvalNode joinQual;   // ex) a.id = b.id
-  protected EvalNode joinFilter; // ex) a > 10
-
-  protected List<Column[]> joinKeyPairs;
-
-  // temporal tuples and states for nested loop join
-  protected boolean first = true;
-  protected FrameTuple frameTuple;
-  protected Tuple outTuple = null;
-  protected Map<Tuple, List<Tuple>> tupleSlots;
-  protected Iterator<Tuple> iterator = null;
-  protected Tuple leftTuple;
-  protected Tuple leftKeyTuple;
-
-  protected int [] leftKeyList;
-  protected int [] rightKeyList;
-
-  protected boolean finished = false;
-  protected boolean shouldGetLeftTuple = true;
-
-  // projection
-  protected Projector projector;
-
-  private int rightNumCols;
-  private TableStats cachedRightTableStats;
   private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
 
   public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
                                PhysicalExec rightChild) {
-    super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
-        plan.getOutSchema(), leftChild, rightChild);
-    this.plan = plan;
-
-    List<EvalNode> joinQuals = Lists.newArrayList();
-    List<EvalNode> joinFilters = Lists.newArrayList();
-    for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(plan.getJoinQual())) {
-      if (EvalTreeUtil.isJoinQual(eachQual, true)) {
-        joinQuals.add(eachQual);
-      } else {
-        joinFilters.add(eachQual);
-      }
-    }
-
-    this.joinQual = AlgebraicUtil.createSingletonExprFromCNF(joinQuals.toArray(new EvalNode[joinQuals.size()]));
-    if (joinFilters.size() > 0) {
-      this.joinFilter = AlgebraicUtil.createSingletonExprFromCNF(joinFilters.toArray(new EvalNode[joinFilters.size()]));
-    } else {
-      this.joinFilter = null;
-    }
-
-    // HashJoin only can manage equi join key pairs.
-    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(),
-        rightChild.getSchema(), false);
-
-    leftKeyList = new int[joinKeyPairs.size()];
-    rightKeyList = new int[joinKeyPairs.size()];
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
-    }
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
-    }
-
-    // for projection
-    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
-
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
-    leftKeyTuple = new VTuple(leftKeyList.length);
-
-    rightNumCols = rightChild.getSchema().size();
-
-    joinQual.bind(context.getEvalContext(), inSchema);
-    if (joinFilter != null) {
-      joinFilter.bind(context.getEvalContext(), inSchema);
-    }
+    super(context, plan, leftChild, rightChild);
   }
 
   @Override
-  protected void compile() {
-    joinQual = context.getPrecompiledEval(inSchema, joinQual);
-  }
-
-  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
-    for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
-    }
-  }
-
   public Tuple next() throws IOException {
     if (first) {
       loadRightToHashTable();
     }
 
-    Tuple rightTuple;
-    boolean found = false;
-
-    while(!context.isStopped() && !finished) {
-
-      if (shouldGetLeftTuple) { // initially, it is true.
-        // getting new outer
-        leftTuple = leftChild.next(); // it comes from a disk
-        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-          finished = true;
-          return null;
-        }
-
-        // getting corresponding right
-        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
-        List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
-        if (rightTuples != null) { // found right tuples on in-memory hash table.
-          iterator = rightTuples.iterator();
-          shouldGetLeftTuple = false;
-        } else {
-          // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
-          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-          frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(frameTuple, outTuple);
-          // we simulate we found a match, which is exactly the null padded one
-          shouldGetLeftTuple = true;
-          return outTuple;
-        }
-      }
-
-      // getting a next right tuple on in-memory hash table.
-      rightTuple = iterator.next();
-      if (!iterator.hasNext()) { // no more right tuples for this hash key
-        shouldGetLeftTuple = true;
-      }
-
-      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-
-      // if there is no join filter, it is always true.
-      boolean satisfiedWithFilter = joinFilter == null || joinFilter.eval(frameTuple).isTrue();
-      boolean satisfiedWithJoinCondition = joinQual.eval(frameTuple).isTrue();
-
-      // if a composited tuple satisfies with both join filter and join condition
-      if (satisfiedWithJoinCondition && satisfiedWithFilter) {
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
-      } else {
-
-        // if join filter is satisfied, the left outer join (LOJ) operator should return the null padded tuple
-        // only once. Then, LOJ operator should take the next left tuple.
-        if (!satisfiedWithFilter) {
-          shouldGetLeftTuple = true;
-        }
-
-        // null padding
-        Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-        frameTuple.set(leftTuple, nullPaddedTuple);
-
+    while (!context.isStopped() && !finished) {
+      if (iterator != null && iterator.hasNext()) {
+        frameTuple.setRight(iterator.next());
         projector.eval(frameTuple, outTuple);
         return outTuple;
       }
-    }
-
-    return outTuple;
-  }
-
-  protected void loadRightToHashTable() throws IOException {
-    ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
-    if (scanExec.canBroadcast()) {
-      /* If this table can broadcast, all tasks in a node will share the same cache */
-      TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
-          context, scanExec.getCanonicalName(), scanExec.getFragments());
-      loadRightFromCache(key);
-    } else {
-      this.tupleSlots = buildRightToHashTable();
-    }
-
-    first = false;
-  }
-
-  protected void loadRightFromCache(TableCacheKey key) throws IOException {
-    ExecutionBlockSharedResource sharedResource = context.getSharedResource();
-    synchronized (sharedResource.getLock()) {
-      if (sharedResource.hasBroadcastCache(key)) {
-        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
-        this.tupleSlots = data.getData();
-        this.cachedRightTableStats = data.getTableStats();
-      } else {
-        CacheHolder.BroadcastCacheHolder holder =
-            new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
-        sharedResource.addBroadcastCache(key, holder);
-        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
-        this.tupleSlots = data.getData();
-        this.cachedRightTableStats = data.getTableStats();
+      Tuple leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = true;
+        return null;
       }
-    }
-  }
+      frameTuple.setLeft(leftTuple);
 
-  private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
-    Tuple tuple;
-    Tuple keyTuple;
-    Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
-
-    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
-      keyTuple = new VTuple(joinKeyPairs.size());
-      for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      if (leftFiltered(leftTuple)) {
+        iterator = nullIterator(rightNumCols);
+        continue;
       }
 
-      List<Tuple> newValue = map.get(keyTuple);
-
-      if (newValue != null) {
-        newValue.add(tuple);
-      } else {
-        newValue = new ArrayList<Tuple>();
-        newValue.add(tuple);
-        map.put(keyTuple, newValue);
+      // getting corresponding right
+      List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+      Iterator<Tuple> rightTuples = rightFiltered(hashed);
+      if (!rightTuples.hasNext()) {
+        //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+        //output a tuple with the nulls padded rightTuple
+        iterator = nullIterator(rightNumCols);
+        continue;
       }
+      iterator = rightTuples;
     }
 
-    return map;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-
-    tupleSlots.clear();
-    first = true;
-
-    finished = false;
-    iterator = null;
-    shouldGetLeftTuple = true;
-  }
-
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-    tupleSlots.clear();
-    tupleSlots = null;
-    iterator = null;
-    plan = null;
-    joinQual = null;
-    joinFilter = null;
-    projector = null;
-  }
-
-  public JoinNode getPlan() {
-    return this.plan;
-  }
-
-  @Override
-  public TableStats getInputStats() {
-    if (leftChild == null) {
-      return inputStats;
-    }
-    TableStats leftInputStats = leftChild.getInputStats();
-    inputStats.setNumBytes(0);
-    inputStats.setReadBytes(0);
-    inputStats.setNumRows(0);
-
-    if (leftInputStats != null) {
-      inputStats.setNumBytes(leftInputStats.getNumBytes());
-      inputStats.setReadBytes(leftInputStats.getReadBytes());
-      inputStats.setNumRows(leftInputStats.getNumRows());
-    }
-
-    TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
-    if (rightInputStats != null) {
-      inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
-      inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
-      inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
-    }
-
-    return inputStats;
+    return null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 32e6d08..41e842a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -50,50 +50,34 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
    * @return The tuple which is firstly matched to a given join condition.
    * @throws java.io.IOException
    */
+  @Override
   public Tuple next() throws IOException {
     if (first) {
       loadRightToHashTable();
     }
 
-    Tuple rightTuple;
-    boolean notFound;
-
     while(!context.isStopped() && !finished) {
-
-      // getting new outer
-      leftTuple = leftChild.next(); // it comes from a disk
-      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
-        finished = true;
-        return null;
+      if (iterator != null && iterator.hasNext()) {
+        frameTuple.setRight(iterator.next());
+        projector.eval(frameTuple, outTuple);
+        return outTuple;
       }
-
-      // Try to find a hash bucket in in-memory hash table
-      getKeyLeftTuple(leftTuple, leftKeyTuple);
-      List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
-      if (rightTuples != null) {
-        // if found, it gets a hash bucket from the hash table.
-        iterator = rightTuples.iterator();
-      } else {
+      // getting new outer
+      Tuple leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = leftTuple == null;
         continue;
       }
 
-      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
-      // If it finds any matched tuple, it returns the tuple immediately.
-      notFound = true;
-      while (notFound && iterator.hasNext()) {
-        rightTuple = iterator.next();
-        frameTuple.set(leftTuple, rightTuple);
-        if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found
-          notFound = false;
-          projector.eval(frameTuple, outTuple);
-        }
-      }
+      frameTuple.setLeft(leftTuple);
 
-      if (!notFound) { // if there is no matched tuple
-        break;
+      // Try to find a hash bucket in in-memory hash table
+      List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+      if (hashed != null && rightFiltered(hashed).hasNext()) {
+        // if found, it gets a hash bucket from the hash table.
+        iterator = nullIterator(0);
       }
     }
-
-    return outTuple;
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
deleted file mode 100644
index 735623d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-
-public class NLLeftOuterJoinExec extends CommonJoinExec {
-  // temporal tuples and states for nested loop join
-  private boolean needNextRightTuple;
-  private FrameTuple frameTuple;
-  private Tuple leftTuple = null;
-  private Tuple rightTuple = null;
-  private Tuple outTuple = null;
-
-  private boolean foundAtLeastOneMatch;
-  private int rightNumCols;
-
-  public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
-                             PhysicalExec rightChild) {
-    super(context, plan, leftChild, rightChild);
-    // for join
-    needNextRightTuple = true;
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
-
-    foundAtLeastOneMatch = false;
-    rightNumCols = rightChild.getSchema().size();
-  }
-
-  public Tuple next() throws IOException {
-    while (!context.isStopped()) {
-      if (needNextRightTuple) {
-        leftTuple = leftChild.next();
-        if (leftTuple == null) {
-          return null;
-        }
-        needNextRightTuple = false;
-        // a new tuple from the left child has initially no matches on the right operand
-        foundAtLeastOneMatch = false;
-      }
-      rightTuple = rightChild.next();
-
-      if (rightTuple == null) {
-        // the scan of the right operand is finished with no matches found
-        if(foundAtLeastOneMatch == false){
-          //output a tuple with the nulls padded rightTuple
-          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-          frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(frameTuple, outTuple);
-          // we simulate we found a match, which is exactly the null padded one
-          foundAtLeastOneMatch = true;
-          needNextRightTuple = true;
-          rightChild.rescan();
-          return outTuple;
-        } else {
-          needNextRightTuple = true;
-          rightChild.rescan();
-          continue;
-        }
-      }
-
-      frameTuple.set(leftTuple, rightTuple);
-      ;
-      if (joinQual.eval(frameTuple).isTrue()) {
-        projector.eval(frameTuple, outTuple);
-        foundAtLeastOneMatch = true;
-        return outTuple;
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-    needNextRightTuple = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index 505b599..c4d90a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -79,9 +79,6 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
   RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException;
 
-  RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
-      throws PhysicalPlanningException;
-
   RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 7abfbe6..fd825b1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -102,7 +102,6 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
    * @throws IOException
    */
   public Tuple next() throws IOException {
-    Tuple previous;
 
     while (!context.isStopped()) {
       boolean newRound = false;
@@ -121,7 +120,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
 
         // The finalizing stage, where remaining tuples on the only right are transformed into left-padded results
         if (end) {
-          if (initRightDone == false) {
+          if (!initRightDone) {
             // maybe the left operand was empty => the right one didn't have the chance to initialize
             rightTuple = rightChild.next();
             initRightDone = true;
@@ -160,18 +159,24 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
           }
         }
 
-        if(rightTuple == null){
+        if(rightTuple == null) {
           rightTuple = rightChild.next();
-
-          if(rightTuple != null){
-            initRightDone = true;
-          }
-          else {
+          if (rightTuple == null) {
             initRightDone = true;
             end = true;
             continue;
           }
         }
+        if (rightFiltered(rightTuple)) {
+          Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+          frameTuple.set(nullPaddedTuple, rightTuple);
+          projector.eval(frameTuple, outTuple);
+
+          rightTuple = null;
+          return outTuple;
+        }
+        initRightDone = true;
+
         //////////////////////////////////////////////////////////////////////
         // END INITIALIZATION STAGE
         //////////////////////////////////////////////////////////////////////
@@ -203,10 +208,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
 
             // we simulate we found a match, which is exactly the null padded one
             // BEFORE RETURN, MOVE FORWARD
-            rightTuple = rightChild.next();
-            if(rightTuple == null) {
-              end = true;
-            }
+            rightTuple = null;
             return outTuple;
 
           } else if (cmp < 0) {
@@ -223,6 +225,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
         // END MOVE FORWARDING STAGE
         //////////////////////////////////////////////////////////////////////
 
+        Tuple previous = null;
         // once a match is found, retain all tuples with this key in tuple slots on each side
         if(!end) {
           endInPopulationStage = false;
@@ -257,6 +260,19 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
             endInPopulationStage = true;
           }
         } // if end false
+        if (previous != null && rightFiltered(previous)) {
+          Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+          frameTuple.set(nullPaddedTuple, previous);
+          projector.eval(frameTuple, outTuple);
+
+          // reset tuple slots for a new round
+          leftTupleSlots.clear();
+          innerTupleSlots.clear();
+          posRightTupleSlots = -1;
+          posLeftTupleSlots = -1;
+
+          return outTuple;
+        }
       } // if newRound
 
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
index 6a5c0bf..addca49 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.utils;
 
-import com.google.common.collect.Maps;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.storage.Tuple;
@@ -66,7 +65,7 @@ public interface CacheHolder<T> {
 
     @Override
     public Map<Tuple, List<Tuple>> getData() {
-      return Maps.newHashMap(data);
+      return data;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 95debd4..7210214 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -206,10 +206,10 @@ public class TestHashSemiJoinExec {
     // expect result without duplicated tuples.
     while ((tuple = exec.next()) != null) {
       count++;
-      assertTrue(i == tuple.get(0).asInt4());
-      assertTrue(i == tuple.get(1).asInt4());
-      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
-      assertTrue(10 + i == tuple.get(3).asInt4());
+      assertEquals(i, tuple.get(0).asInt4());
+      assertEquals(i, tuple.get(1).asInt4());
+      assertEquals("dept_" + i, tuple.get(2).asChars());
+      assertEquals(10 + i, tuple.get(3).asInt4());
 
       i += 2;
     }