You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:25 UTC

[03/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
new file mode 100644
index 0000000..cee0cb0
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -0,0 +1,1023 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.RangeRetrieverHandler;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestPhysicalPlanner {
+  private static TajoTestingCluster util;
+  private static TajoConf conf;
+  private static CatalogService catalog;
+  private static SQLAnalyzer analyzer;
+  private static LogicalPlanner planner;
+  private static LogicalOptimizer optimizer;
+  private static AbstractStorageManager sm;
+  private static Path testDir;
+  private static Session session = LocalTajoTestingUtility.createDummySession();
+
+  private static TableDesc employee = null;
+  private static TableDesc score = null;
+
+  private static MasterPlan masterPlan;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    util = new TajoTestingCluster();
+
+    util.startCatalogCluster();
+    conf = util.getConfiguration();
+    testDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    catalog = util.getMiniCatalogCluster().getCatalog();
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
+      catalog.createFunction(funcDesc);
+    }
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("name", Type.TEXT);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    Schema scoreSchema = new Schema();
+    scoreSchema.addColumn("deptname", Type.TEXT);
+    scoreSchema.addColumn("class", Type.TEXT);
+    scoreSchema.addColumn("score", Type.INT4);
+    scoreSchema.addColumn("nullable", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeSchema.size());
+    for (int i = 0; i < 100; i++) {
+      tuple.put(new Datum[] {DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(i), DatumFactory.createText("dept_" + i)});
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    employee = new TableDesc(
+        CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), employeeSchema, employeeMeta,
+        employeePath);
+    catalog.createTable(employee);
+
+    Path scorePath = new Path(testDir, "score");
+    TableMeta scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV, new Options());
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scoreSchema, scorePath);
+    appender.init();
+    score = new TableDesc(
+        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"), scoreSchema, scoreMeta,
+        scorePath);
+    tuple = new VTuple(scoreSchema.size());
+    int m = 0;
+    for (int i = 1; i <= 5; i++) {
+      for (int k = 3; k < 5; k++) {
+        for (int j = 1; j <= 3; j++) {
+          tuple.put(
+              new Datum[] {
+                  DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5)
+                  DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2)
+                  DatumFactory.createInt4(j), // 1 ~ 3
+              m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()});
+          appender.addTuple(tuple);
+          m++;
+        }
+      }
+    }
+    appender.flush();
+    appender.close();
+    catalog.createTable(score);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer(conf);
+
+    masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  private String[] QUERIES = {
+      "select name, empId, deptName from employee", // 0
+      "select name, empId, e.deptName, manager from employee as e, dept as dp", // 1
+      "select name, empId, e.deptName, manager, score from employee as e, dept, score", // 2
+      "select p.deptName, sum(score) from dept as p, score group by p.deptName having sum(score) > 30", // 3
+      "select p.deptName, score from dept as p, score order by score asc", // 4
+      "select name from employee where empId = 100", // 5
+      "select deptName, class, score from score", // 6
+      "select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 7
+      "select count(*), max(score), min(score) from score", // 8
+      "select count(deptName) from score", // 9
+      "select managerId, empId, deptName from employee order by managerId, empId desc", // 10
+      "select deptName, nullable from score group by deptName, nullable", // 11
+      "select 3 < 4 as ineq, 3.5 * 2 as score", // 12
+      "select (1 > 0) and 3 > 1", // 13
+      "select sum(score), max(score), min(score) from score", // 14
+      "select deptname, sum(score), max(score), min(score) from score group by deptname", // 15
+      "select name from employee where empid >= 0", // 16
+  };
+
+  @Test
+  public final void testCreateScanPlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+        employee.getPath(), Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(session, expr);
+    LogicalNode rootNode =plan.getRootBlock().getRoot();
+    optimizer.optimize(plan);
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    Tuple tuple;
+    int i = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      assertTrue(tuple.contains(0));
+      assertTrue(tuple.contains(1));
+      assertTrue(tuple.contains(2));
+      i++;
+    }
+    exec.close();
+    assertEquals(100, i);
+  }
+
+  @Test
+  public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+        employee.getPath(), Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[16]);
+    LogicalPlan plan = planner.createPlan(session, expr);
+    LogicalNode rootNode =plan.getRootBlock().getRoot();
+    optimizer.optimize(plan);
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    Tuple tuple;
+    int i = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      assertTrue(tuple.contains(0));
+      i++;
+    }
+    exec.close();
+    assertEquals(100, i);
+  }
+
+  @Test
+  public final void testGroupByPlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[7]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    int i = 0;
+    Tuple tuple;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      assertEquals(6, tuple.get(2).asInt4()); // sum
+      assertEquals(3, tuple.get(3).asInt4()); // max
+      assertEquals(1, tuple.get(4).asInt4()); // min
+      i++;
+    }
+    exec.close();
+    assertEquals(10, i);
+  }
+
+  @Test
+  public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException {
+    // TODO - currently, this query does not use hash-based group operator.
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir(
+        "target/test-data/testHashGroupByPlanWithALLField");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[15]);
+    LogicalPlan plan = planner.createPlan(session, expr);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    int i = 0;
+    Tuple tuple;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      assertEquals(12, tuple.get(1).asInt4()); // sum
+      assertEquals(3, tuple.get(2).asInt4()); // max
+      assertEquals(1, tuple.get(3).asInt4()); // min
+      i++;
+    }
+    exec.close();
+    assertEquals(5, i);
+  }
+
+  @Test
+  public final void testSortGroupByPlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[]{frags[0]}, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[7]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    optimizer.optimize(plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan.getRootBlock().getRoot());
+
+    /*HashAggregateExec hashAgg = (HashAggregateExec) exec;
+
+    SeqScanExec scan = (SeqScanExec) hashAgg.getSubOp();
+
+    Column [] grpColumns = hashAgg.getAnnotation().getGroupingColumns();
+    QueryBlock.SortSpec [] specs = new QueryBlock.SortSpec[grpColumns.length];
+    for (int i = 0; i < grpColumns.length; i++) {
+      specs[i] = new QueryBlock.SortSpec(grpColumns[i], true, false);
+    }
+    SortNode annotation = new SortNode(specs);
+    annotation.setInSchema(scan.getSchema());
+    annotation.setOutSchema(scan.getSchema());
+    SortExec sort = new SortExec(annotation, scan);
+    exec = new SortAggregateExec(hashAgg.getAnnotation(), sort);*/
+
+    int i = 0;
+    Tuple tuple;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      assertEquals(6, tuple.get(2).asInt4()); // sum
+      assertEquals(3, tuple.get(3).asInt4()); // max
+      assertEquals(1, tuple.get(4).asInt4()); // min
+      i++;
+    }
+    assertEquals(10, i);
+
+    exec.rescan();
+    i = 0;
+    while ((tuple = exec.next()) != null) {
+      assertEquals(6, tuple.get(2).asInt4()); // sum
+      assertEquals(3, tuple.get(3).asInt4()); // max
+      assertEquals(1, tuple.get(4).asInt4()); // min
+      i++;
+    }
+    exec.close();
+    assertEquals(10, i);
+  }
+
+  private String[] CreateTableAsStmts = {
+      "create table grouped1 as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 0
+      "create table grouped2 using rcfile as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 1
+      "create table grouped3 partition by column (dept text,  class text) as select sum(score), max(score), min(score), deptName, class from score group by deptName, class", // 2
+  };
+
+  @Test
+  public final void testStorePlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    ctx.setOutputPath(new Path(workDir, "grouped1"));
+
+    Expr context = analyzer.parse(CreateTableAsStmts[0]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+
+    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
+        ctx.getOutputPath());
+    scanner.init();
+    Tuple tuple;
+    int i = 0;
+    while ((tuple = scanner.next()) != null) {
+      assertEquals(6, tuple.get(2).asInt4()); // sum
+      assertEquals(3, tuple.get(3).asInt4()); // max
+      assertEquals(1, tuple.get(4).asInt4()); // min
+      i++;
+    }
+    assertEquals(10, i);
+    scanner.close();
+
+    // Examine the statistics information
+    assertEquals(10, ctx.getResultStats().getNumRows().longValue());
+  }
+
+  @Test
+  public final void testStorePlanWithRCFile() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    ctx.setOutputPath(new Path(workDir, "grouped2"));
+
+    Expr context = analyzer.parse(CreateTableAsStmts[1]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
+        ctx.getOutputPath());
+    scanner.init();
+    Tuple tuple;
+    int i = 0;
+    while ((tuple = scanner.next()) != null) {
+      assertEquals(6, tuple.get(2).asInt4()); // sum
+      assertEquals(3, tuple.get(3).asInt4()); // max
+      assertEquals(1, tuple.get(4).asInt4()); // min
+      i++;
+    }
+    assertEquals(10, i);
+    scanner.close();
+
+    // Examine the statistics information
+    assertEquals(10, ctx.getResultStats().getNumRows().longValue());
+  }
+
+  @Test
+  public final void testEnforceForDefaultColumnPartitionStorePlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    ctx.setOutputPath(new Path(workDir, "grouped3"));
+
+    Expr context = analyzer.parse(CreateTableAsStmts[2]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    assertTrue(exec instanceof SortBasedColPartitionStoreExec);
+  }
+
+  @Test
+  public final void testEnforceForHashBasedColumnPartitionStorePlan() throws IOException, PlanningException {
+
+    Expr context = analyzer.parse(CreateTableAsStmts[2]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalRootNode rootNode = (LogicalRootNode) optimizer.optimize(plan);
+    CreateTableNode createTableNode = rootNode.getChild();
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.HASH_PARTITION);
+
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(enforcer);
+    ctx.setOutputPath(new Path(workDir, "grouped4"));
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    assertTrue(exec instanceof HashBasedColPartitionStoreExec);
+  }
+
+  @Test
+  public final void testEnforceForSortBasedColumnPartitionStorePlan() throws IOException, PlanningException {
+
+    Expr context = analyzer.parse(CreateTableAsStmts[2]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalRootNode rootNode = (LogicalRootNode) optimizer.optimize(plan);
+    CreateTableNode createTableNode = rootNode.getChild();
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.SORT_PARTITION);
+
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(enforcer);
+    ctx.setOutputPath(new Path(workDir, "grouped5"));
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    assertTrue(exec instanceof SortBasedColPartitionStoreExec);
+  }
+
+  @Test
+  public final void testPartitionedStorePlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[7]);
+    LogicalPlan plan = planner.createPlan(session, context);
+
+    int numPartitions = 3;
+    Column key1 = new Column("default.score.deptname", Type.TEXT);
+    Column key2 = new Column("default.score.class", Type.TEXT);
+    DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+        ShuffleType.HASH_SHUFFLE, numPartitions);
+    dataChannel.setShuffleKeys(new Column[]{key1, key2});
+    ctx.setDataChannel(dataChannel);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
+
+    FileSystem fs = sm.getFileSystem();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    Path path = new Path(workDir, "output");
+    FileStatus [] list = fs.listStatus(path);
+    assertEquals(numPartitions, list.length);
+
+    FileFragment[] fragments = new FileFragment[list.length];
+    int i = 0;
+    for (FileStatus status : list) {
+      fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen());
+    }
+    Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
+    scanner.init();
+
+    Tuple tuple;
+    i = 0;
+    while ((tuple = scanner.next()) != null) {
+      assertEquals(6, tuple.get(2).asInt4()); // sum
+      assertEquals(3, tuple.get(3).asInt4()); // max
+      assertEquals(1, tuple.get(4).asInt4()); // min
+      i++;
+    }
+    assertEquals(10, i);
+    scanner.close();
+
+    // Examine the statistics information
+    assertEquals(10, ctx.getResultStats().getNumRows().longValue());
+  }
+
+  @Test
+  public final void testPartitionedStorePlanWithEmptyGroupingSet()
+      throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
+
+    Path workDir = CommonTestingUtil.getTestDir(
+        "target/test-data/testPartitionedStorePlanWithEmptyGroupingSet");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[14]);
+    LogicalPlan plan = planner.createPlan(session, expr);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+    int numPartitions = 1;
+    DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+        ShuffleType.HASH_SHUFFLE, numPartitions);
+    dataChannel.setShuffleKeys(new Column[]{});
+    ctx.setDataChannel(dataChannel);
+    optimizer.optimize(plan);
+
+    TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    Path path = new Path(workDir, "output");
+    FileSystem fs = sm.getFileSystem();
+
+    FileStatus [] list = fs.listStatus(path);
+    assertEquals(numPartitions, list.length);
+
+    FileFragment[] fragments = new FileFragment[list.length];
+    int i = 0;
+    for (FileStatus status : list) {
+      fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen());
+    }
+    Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
+    scanner.init();
+    Tuple tuple;
+    i = 0;
+    while ((tuple = scanner.next()) != null) {
+      assertEquals(60, tuple.get(0).asInt4()); // sum
+      assertEquals(3, tuple.get(1).asInt4()); // max
+      assertEquals(1, tuple.get(2).asInt4()); // min
+      i++;
+    }
+    assertEquals(1, i);
+    scanner.close();
+
+    // Examine the statistics information
+    assertEquals(1, ctx.getResultStats().getNumRows().longValue());
+  }
+
+  @Test
+  public final void testAggregationFunction() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[8]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    // Set all aggregation functions to the first phase mode
+    GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+    for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+      function.setFirstPhase();
+    }
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    exec.init();
+    Tuple tuple = exec.next();
+    assertEquals(30, tuple.get(0).asInt8());
+    assertEquals(3, tuple.get(1).asInt4());
+    assertEquals(1, tuple.get(2).asInt4());
+    assertNull(exec.next());
+    exec.close();
+  }
+
+  @Test
+  public final void testCountFunction() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[9]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    // Set all aggregation functions to the first phase mode
+    GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+    for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+      function.setFirstPhase();
+    }
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    Tuple tuple = exec.next();
+    assertEquals(30, tuple.get(0).asInt8());
+    assertNull(exec.next());
+    exec.close();
+  }
+
+  @Test
+  public final void testGroupByWithNullValue() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[11]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    int count = 0;
+    exec.init();
+    while(exec.next() != null) {
+      count++;
+    }
+    exec.close();
+    assertEquals(10, count);
+  }
+
+  @Test
+  public final void testUnionPlan() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr  context = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+    LogicalRootNode root = (LogicalRootNode) rootNode;
+    UnionNode union = plan.createNode(UnionNode.class);
+    union.setLeftChild(root.getChild());
+    union.setRightChild(root.getChild());
+    root.setChild(union);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, root);
+
+    int count = 0;
+    exec.init();
+    while(exec.next() != null) {
+      count++;
+    }
+    exec.close();
+    assertEquals(200, count);
+  }
+
+  @Test
+  public final void testEvalExpr() throws IOException, PlanningException {
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { }, workDir);
+    Expr expr = analyzer.parse(QUERIES[12]);
+    LogicalPlan plan = planner.createPlan(session, expr);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    Tuple tuple;
+    exec.init();
+    tuple = exec.next();
+    exec.close();
+    assertEquals(true, tuple.get(0).asBool());
+    assertTrue(7.0d == tuple.get(1).asFloat8());
+
+    expr = analyzer.parse(QUERIES[13]);
+    plan = planner.createPlan(session, expr);
+    rootNode = optimizer.optimize(plan);
+
+    phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    tuple = exec.next();
+    exec.close();
+    assertEquals(DatumFactory.createBool(true), tuple.get(0));
+  }
+
+  public final String [] createIndexStmt = {
+      "create index idx_employee on employee using bst (name null first, empId desc)"
+  };
+
+  //@Test
+  public final void testCreateIndex() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] {frags[0]}, workDir);
+    Expr context = analyzer.parse(createIndexStmt[0]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    while (exec.next() != null) {
+    }
+    exec.close();
+
+    FileStatus [] list = sm.getFileSystem().listStatus(StorageUtil.concatPath(workDir, "index"));
+    assertEquals(2, list.length);
+  }
+
+  final static String [] duplicateElimination = {
+      "select distinct deptname from score",
+  };
+
+  @Test
+  public final void testDuplicateEliminate() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(),
+        Integer.MAX_VALUE);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(duplicateElimination[0]);
+    LogicalPlan plan = planner.createPlan(session, expr);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    Tuple tuple;
+
+    int cnt = 0;
+    Set<String> expected = Sets.newHashSet(
+        "name_1", "name_2", "name_3", "name_4", "name_5");
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      assertTrue(expected.contains(tuple.get(0).asChars()));
+      cnt++;
+    }
+    exec.close();
+    assertEquals(5, cnt);
+  }
+
+  public String [] SORT_QUERY = {
+      "select name, empId from employee order by empId"
+  };
+
+  @Test
+  public final void testIndexedStoreExec() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+        employee.getPath(), Integer.MAX_VALUE);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(SORT_QUERY[0]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+    DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+        TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE);
+    channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
+    ctx.setDataChannel(channel);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    Tuple tuple;
+    exec.init();
+    exec.next();
+    exec.close();
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn("?empId", Type.INT4);
+    SortSpec[] sortSpec = new SortSpec[1];
+    sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false);
+    TupleComparator comp = new TupleComparator(keySchema, sortSpec);
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"),
+        keySchema, comp);
+    reader.open();
+    Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
+    TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new Options());
+    SeekableScanner scanner =
+        StorageManagerFactory.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
+    scanner.init();
+
+    int cnt = 0;
+
+    while(scanner.next() != null) {
+      cnt++;
+    }
+    scanner.reset();
+
+    assertEquals(100 ,cnt);
+
+    Tuple keytuple = new VTuple(1);
+    for(int i = 1 ; i < 100 ; i ++) {
+      keytuple.put(0, DatumFactory.createInt4(i));
+      long offsets = reader.find(keytuple);
+      scanner.seek(offsets);
+      tuple = scanner.next();
+
+      assertTrue("[seek check " + (i) + " ]", ("name_" + i).equals(tuple.get(0).asChars()));
+      assertTrue("[seek check " + (i) + " ]" , i == tuple.get(1).asInt4());
+    }
+
+
+    // The below is for testing RangeRetrieverHandler.
+    RowStoreEncoder encoder = RowStoreUtil.createEncoder(keySchema);
+    RangeRetrieverHandler handler = new RangeRetrieverHandler(
+        new File(new Path(workDir, "output").toUri()), keySchema, comp);
+    Map<String,List<String>> kvs = Maps.newHashMap();
+    Tuple startTuple = new VTuple(1);
+    startTuple.put(0, DatumFactory.createInt4(50));
+    kvs.put("start", Lists.newArrayList(
+        new String(Base64.encodeBase64(
+            encoder.toBytes(startTuple), false))));
+    Tuple endTuple = new VTuple(1);
+    endTuple.put(0, DatumFactory.createInt4(80));
+    kvs.put("end", Lists.newArrayList(
+        new String(Base64.encodeBase64(
+            encoder.toBytes(endTuple), false))));
+    FileChunk chunk = handler.get(kvs);
+
+    scanner.seek(chunk.startOffset());
+    keytuple = scanner.next();
+    assertEquals(50, keytuple.get(1).asInt4());
+
+    long endOffset = chunk.startOffset() + chunk.length();
+    while((keytuple = scanner.next()) != null && scanner.getNextOffset() <= endOffset) {
+      assertTrue(keytuple.get(1).asInt4() <= 80);
+    }
+
+    scanner.close();
+  }
+
+  @Test
+  public final void testSortEnforcer() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+        employee.getPath(), Integer.MAX_VALUE);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer");
+    Expr context = analyzer.parse(SORT_QUERY[0]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertTrue(exec instanceof MemSortExec);
+
+    context = analyzer.parse(SORT_QUERY[0]);
+    plan = planner.createPlan(session, context);
+    optimizer.optimize(plan);
+    rootNode = plan.getRootBlock().getRoot();
+
+    sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+    enforcer = new Enforcer();
+    enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
+    ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertTrue(exec instanceof ExternalSortExec);
+  }
+
+  @Test
+  public final void testGroupByEnforcer() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(), Integer.MAX_VALUE);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer");
+    Expr context = analyzer.parse(QUERIES[7]);
+    LogicalPlan plan = planner.createPlan(session, context);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    GroupbyNode groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceHashAggregation(groupByNode.getPID());
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertNotNull(PhysicalPlanUtil.findExecutor(exec, HashAggregateExec.class));
+
+    context = analyzer.parse(QUERIES[7]);
+    plan = planner.createPlan(session, context);
+    optimizer.optimize(plan);
+    rootNode = plan.getRootBlock().getRoot();
+
+    groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+    enforcer = new Enforcer();
+    enforcer.enforceSortAggregation(groupByNode.getPID(), null);
+    ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertTrue(exec instanceof SortAggregateExec);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
new file mode 100644
index 0000000..c60e05c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static junit.framework.Assert.assertNotNull;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestProgressExternalSortExec {
+  private TajoConf conf;
+  private TajoTestingCluster util;
+  private final String TEST_PATH = "target/test-data/TestProgressExternalSortExec";
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private final int numTuple = 100000;
+  private Random rnd = new Random(System.currentTimeMillis());
+
+  private TableDesc employee;
+
+  private TableStats testDataStats;
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new TajoConf();
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", TajoDataTypes.Type.INT4);
+    schema.addColumn("empid", TajoDataTypes.Type.INT4);
+    schema.addColumn("deptname", TajoDataTypes.Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    appender.enableStats();
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < numTuple; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(rnd.nextInt(50)),
+          DatumFactory.createInt4(rnd.nextInt(100)),
+          DatumFactory.createText("dept_" + i),
+      });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    System.out.println(appender.getStats().getNumRows() + " rows (" + appender.getStats().getNumBytes() + " Bytes)");
+
+    testDataStats = appender.getStats();
+    employee = new TableDesc(
+        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, employeeMeta,
+        employeePath);
+    catalog.createTable(employee);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CommonTestingUtil.cleanupTestDir(TEST_PATH);
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, empId from employee order by managerId, empId"
+  };
+
+  @Test
+  public void testExternalSortExecProgressWithMemTableScanner() throws Exception {
+    testProgress(testDataStats.getNumBytes().intValue() * 20);    //multiply 20 for memory fit
+  }
+
+  @Test
+  public void testExternalSortExecProgressWithPairWiseMerger() throws Exception {
+    testProgress(testDataStats.getNumBytes().intValue());
+  }
+
+  private void testProgress(int sortBufferBytesNum) throws Exception {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // TODO - should be planed with user's optimization hint
+    if (!(proj.getChild() instanceof ExternalSortExec)) {
+      UnaryPhysicalExec sortExec = proj.getChild();
+      SeqScanExec scan = sortExec.getChild();
+
+      ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
+          ((MemSortExec)sortExec).getPlan(), scan);
+
+      extSort.setSortBufferBytesNum(sortBufferBytesNum);
+      proj.setChild(extSort);
+    } else {
+      ((ExternalSortExec)proj.getChild()).setSortBufferBytesNum(sortBufferBytesNum);
+    }
+
+    Tuple tuple;
+    Tuple preVal = null;
+    Tuple curVal;
+    int cnt = 0;
+    exec.init();
+    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+        new SortSpec[]{
+            new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)),
+            new SortSpec(new Column("empid", TajoDataTypes.Type.INT4))
+        });
+
+    float initProgress = 0.0f;
+    while ((tuple = exec.next()) != null) {
+      if (cnt == 0) {
+        initProgress = exec.getProgress();
+        assertTrue(initProgress > 0.5f && initProgress < 1.0f);
+      }
+
+      if (cnt == testDataStats.getNumRows() / 2) {
+        float progress = exec.getProgress();
+
+        assertTrue(progress > initProgress);
+      }
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      cnt++;
+    }
+
+    assertEquals(1.0f, exec.getProgress(), 0);
+    assertEquals(numTuple, cnt);
+
+    TableStats tableStats = exec.getInputStats();
+    assertNotNull(tableStats);
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue());
+    assertEquals(cnt, testDataStats.getNumRows().longValue());
+    assertEquals(cnt, tableStats.getNumRows().longValue());
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue());
+
+    // for rescan test
+    preVal = null;
+    exec.rescan();
+
+    cnt = 0;
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      cnt++;
+    }
+    assertEquals(1.0f, exec.getProgress(), 0);
+    assertEquals(numTuple, cnt);
+    exec.close();
+    assertEquals(1.0f, exec.getProgress(), 0);
+
+    tableStats = exec.getInputStats();
+    assertNotNull(tableStats);
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue());
+    assertEquals(cnt, testDataStats.getNumRows().longValue());
+    assertEquals(cnt, tableStats.getNumRows().longValue());
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
new file mode 100644
index 0000000..a45e397
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+
+// this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order
+public class TestRightOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestRightOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private static Session session = LocalTajoTestingUtility.createDummySession();
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+
+  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+                    DatumFactory.createText("dept_" + i),
+                    DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+                    DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+      "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id", //1 nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id" //2 nulls on the right side
+  };
+
+  @Test
+  public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+       //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(12, count);
+    }
+  }
+
+
+  @Test
+  public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+       //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(5, count);
+    }
+  }
+
+    @Test
+  public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
+    
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+        Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+      //for this small data set this is not likely to happen
+      
+      assertEquals(1, 0);
+    }
+    else{
+       Tuple tuple;
+       int count = 0;
+       int i = 1;
+       exec.init();
+  
+       while ((tuple = exec.next()) != null) {
+         //TODO check contents
+         count = count + 1;
+       }
+       exec.close();
+       assertEquals(7, count);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
new file mode 100644
index 0000000..5b504b2
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -0,0 +1,520 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestRightOuterMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestRightOuterMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private static final Session session = LocalTajoTestingUtility.createDummySession();
+
+  private TableDesc dep3;
+  private TableDesc dep4;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String DEP4_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep4");
+  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+
+    //----------------- dep4 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    // 10     | dep_10    | 1010
+    Schema dep4Schema = new Schema();
+    dep4Schema.addColumn("dep_id", Type.INT4);
+    dep4Schema.addColumn("dep_name", Type.TEXT);
+    dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep4Path = new Path(testDir, "dep4.csv");
+    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
+    appender4.init();
+    Tuple tuple4 = new VTuple(dep4Schema.size());
+    for (int i = 0; i < 11; i++) {
+      tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender4.addTuple(tuple4);
+    }
+
+    appender4.flush();
+    appender4.close();
+    dep4 = CatalogUtil.newTableDesc(DEP4_NAME, dep4Schema, dep4Meta, dep4Path);
+    catalog.createTable(dep4);
+
+
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
+    appender5.init();
+
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+    catalog.createTable(phone3);
+
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the right side
+      "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id",
+      // [3] no nulls, right continues after left
+      "select dep4.dep_id, dep_name, emp_id, salary from emp3 right outer join dep4 on dep4.dep_id = emp3.dep_id",
+      // [4] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 right outer join phone3 on emp3.emp_id = phone3.emp_id",
+      // [5] one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 right outer join emp3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testRightOuterMergeJoin0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testRightOuter_MergeJoin1() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(5, count);
+  }
+
+  @Test
+  public final void testRightOuterMergeJoin2() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testRightOuter_MergeJoin3() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep4Frags =
+        StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(13, count);
+  }
+
+  @Test
+  public final void testRightOuter_MergeJoin4() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags =
+        StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(0, count);
+  }
+
+  @Test
+  public final void testRightOuterMergeJoin5() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[5]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin5");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+}