You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:27 UTC

[05/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
new file mode 100644
index 0000000..b56ab47
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static junit.framework.Assert.assertNotNull;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBroadcastJoinPlan {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestBroadcastJoinPlan";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private Path testDir;
+
+  private TableDesc smallTable1;
+  private TableDesc smallTable2;
+  private TableDesc smallTable3;
+  private TableDesc largeTable1;
+  private TableDesc largeTable2;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    conf = util.getConfiguration();
+    conf.setLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD, 500 * 1024);
+    conf.setBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO, true);
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog = util.startCatalogCluster().getCatalog();
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    util.getMiniCatalogCluster().getCatalogServer().reloadBuiltinFunctions(TajoMaster.initBuiltinFunctions());
+
+    Schema smallTable1Schema = new Schema();
+    smallTable1Schema.addColumn("small1_id", TajoDataTypes.Type.INT4);
+    smallTable1Schema.addColumn("small1_contents", TajoDataTypes.Type.TEXT);
+    smallTable1 = makeTestData("default.small1", smallTable1Schema, 10 * 1024);
+
+    Schema smallTable2Schema = new Schema();
+    smallTable2Schema.addColumn("small2_id", TajoDataTypes.Type.INT4);
+    smallTable2Schema.addColumn("small2_contents", TajoDataTypes.Type.TEXT);
+    smallTable2 = makeTestData("default.small2", smallTable2Schema, 10 * 1024);
+
+    Schema smallTable3Schema = new Schema();
+    smallTable3Schema.addColumn("small3_id", TajoDataTypes.Type.INT4);
+    smallTable3Schema.addColumn("small3_contents", TajoDataTypes.Type.TEXT);
+    smallTable3 = makeTestData("default.small3", smallTable3Schema, 10 * 1024);
+
+    Schema largeTable1Schema = new Schema();
+    largeTable1Schema.addColumn("large1_id", TajoDataTypes.Type.INT4);
+    largeTable1Schema.addColumn("large1_contents", TajoDataTypes.Type.TEXT);
+    largeTable1 = makeTestData("default.large1", largeTable1Schema, 1024 * 1024);  //1M
+
+    Schema largeTable2Schema = new Schema();
+    largeTable2Schema.addColumn("large2_id", TajoDataTypes.Type.INT4);
+    largeTable2Schema.addColumn("large2_contents", TajoDataTypes.Type.TEXT);
+    largeTable2 = makeTestData("default.large2", largeTable2Schema, 1024 * 1024);  //1M
+
+    catalog.createTable(smallTable1);
+    catalog.createTable(smallTable2);
+    catalog.createTable(largeTable1);
+    catalog.createTable(largeTable2);
+
+    analyzer = new SQLAnalyzer();
+  }
+
+  private TableDesc makeTestData(String tableName, Schema schema, int dataSize) throws Exception {
+    TableMeta tableMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+    Path dataPath = new Path(testDir, tableName + ".csv");
+
+    String contentsData = "";
+    for (int i = 0; i < 1000; i++) {
+      for (int j = 0; j < 10; j++) {
+        contentsData += j;
+      }
+    }
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(tableMeta, schema,
+        dataPath);
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    int writtenSize = 0;
+    int count = 0;
+    while (true) {
+      TextDatum textDatum = DatumFactory.createText(count + "_" + contentsData);
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(count), textDatum });
+      appender.addTuple(tuple);
+
+      writtenSize += textDatum.size();
+      if (writtenSize >= dataSize) {
+        break;
+      }
+    }
+
+    appender.flush();
+    appender.close();
+
+    TableDesc tableDesc = CatalogUtil.newTableDesc(tableName, schema, tableMeta, dataPath);
+    TableStats tableStats = new TableStats();
+    FileSystem fs = dataPath.getFileSystem(conf);
+    tableStats.setNumBytes(fs.getFileStatus(dataPath).getLen());
+
+    tableDesc.setStats(tableStats);
+
+    return tableDesc;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public final void testBroadcastJoin() throws IOException, PlanningException {
+    String query = "select count(*) from large1 " +
+        "join small1 on large1_id = small1_id " +
+        "join small2 on small1_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395714781593_0000_000007 (TERMINAL)
+        |-eb_1395714781593_0000_000006 (ROOT)
+            |-eb_1395714781593_0000_000005 (LEAF)
+    */
+
+    ExecutionBlock terminalEB = masterPlan.getRoot();
+    assertEquals(1, masterPlan.getChildCount(terminalEB.getId()));
+
+    ExecutionBlock rootEB = masterPlan.getChild(terminalEB.getId(), 0);
+    assertEquals(1, masterPlan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock leafEB = masterPlan.getChild(rootEB.getId(), 0);
+    assertNotNull(leafEB);
+
+    assertEquals(0, masterPlan.getChildCount(leafEB.getId()));
+    Collection<String> broadcastTables = leafEB.getBroadcastTables();
+    assertEquals(2, broadcastTables.size());
+
+    assertTrue(broadcastTables.contains("default.small1"));
+    assertTrue(broadcastTables.contains("default.small2"));
+    assertTrue(!broadcastTables.contains("default.large1"));
+
+    LogicalNode leafNode = leafEB.getPlan();
+    assertEquals(NodeType.GROUP_BY, leafNode.getType());
+
+    LogicalNode joinNode = ((GroupbyNode)leafNode).getChild();
+    assertEquals(NodeType.JOIN, joinNode.getType());
+
+    LogicalNode leftNode = ((JoinNode)joinNode).getLeftChild();
+    LogicalNode rightNode = ((JoinNode)joinNode).getRightChild();
+
+    assertEquals(NodeType.JOIN, leftNode.getType());
+    assertEquals(NodeType.SCAN, rightNode.getType());
+
+    LogicalNode lastLeftNode = ((JoinNode)leftNode).getLeftChild();
+    LogicalNode lastRightNode = ((JoinNode)leftNode).getRightChild();
+
+    assertEquals(NodeType.SCAN, lastLeftNode.getType());
+    assertEquals(NodeType.SCAN, lastRightNode.getType());
+  }
+
+  @Test
+  public final void testNotBroadcastJoinTwoLargeTable() throws IOException, PlanningException {
+    // This query is not broadcast join
+    String query = "select count(*) from large1 " +
+        "join large2 on large1_id = large2_id ";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      Collection<String> broadcastTables = eb.getBroadcastTables();
+      assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+    }
+  }
+
+  @Test
+  public final void testTwoBroadcastJoin() throws IOException, PlanningException {
+    String query = "select count(*) from large1 " +
+        "join small1 on large1_id = small1_id " +
+        "join large2 on large1_id = large2_id " +
+        "join small2 on large2_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395736346625_0000_000009
+      |-eb_1395736346625_0000_000008 (GROUP-BY)
+         |-eb_1395736346625_0000_000007 (GROUP-BY, JOIN)
+           |-eb_1395736346625_0000_000006 (LEAF, JOIN)
+           |-eb_1395736346625_0000_000003 (LEAF, JOIN)
+     */
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    int index = 0;
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      if(index == 0) {
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(1, broadcastTables.size());
+
+        assertTrue(!broadcastTables.contains("default.large1"));
+        assertTrue(broadcastTables.contains("default.small1"));
+      } else if(index == 1) {
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(1, broadcastTables.size());
+        assertTrue(!broadcastTables.contains("default.large2"));
+        assertTrue(broadcastTables.contains("default.small2"));
+      }
+      index++;
+    }
+
+    assertEquals(5, index);
+  }
+
+  @Test
+  public final void testNotBroadcastJoinSubquery() throws IOException, PlanningException {
+    // This query is not broadcast join;
+    String query = "select count(*) from large1 " +
+        "join (select * from small1) a on large1_id = a.small1_id " +
+        "join small2 on a.small1_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395749810370_0000_000007
+       |-eb_1395749810370_0000_000006 (GROUP-BY)
+          |-eb_1395749810370_0000_000005 (GROUP-BY, JOIN)
+             |-eb_1395749810370_0000_000004 (LEAF, SCAN, large1)
+             |-eb_1395749810370_0000_000003 (JOIN)
+                |-eb_1395749810370_0000_000002 (LEAF, SCAN, small2)
+                |-eb_1395749810370_0000_000001 (LEAF, TABLE_SUBQUERY, small1)
+     */
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    int index = 0;
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      Collection<String> broadcastTables = eb.getBroadcastTables();
+      assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+      index++;
+    }
+
+    assertEquals(7, index);
+  }
+
+  @Test
+  public final void testBroadcastJoinSubquery() throws IOException, PlanningException {
+    String query = "select count(*) from large1 " +
+        "join (select * from small1) a on large1_id = a.small1_id " +
+        "join small2 on large1_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395794091662_0000_000007
+       |-eb_1395794091662_0000_000006
+          |-eb_1395794091662_0000_000005 (JOIN)
+             |-eb_1395794091662_0000_000004 (LEAF, SUBQUERY)
+             |-eb_1395794091662_0000_000003 (LEAF, JOIN)
+     */
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    int index = 0;
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      if(index == 0) {
+        //LEAF, JOIN
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(1, broadcastTables.size());
+
+        assertTrue(!broadcastTables.contains("default.large1"));
+        assertTrue(broadcastTables.contains("default.small2"));
+      } else if(index == 1) {
+        //LEAF, SUBQUERY
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+      } else if(index == 2) {
+        //JOIN
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+      }
+      index++;
+    }
+
+    assertEquals(5, index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
new file mode 100644
index 0000000..ab56bea
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMasterPlan {
+
+  @Test
+  public void testConnect() {
+    MasterPlan masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+
+    ExecutionBlock eb1 = masterPlan.newExecutionBlock();
+    ExecutionBlock eb2 = masterPlan.newExecutionBlock();
+    ExecutionBlock eb3 = masterPlan.newExecutionBlock();
+
+    masterPlan.addConnect(eb1, eb2, TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE);
+    assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId()));
+
+    masterPlan.addConnect(eb3, eb2, TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE);
+    assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
+    assertTrue(masterPlan.isConnected(eb3.getId(), eb2.getId()));
+
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId()));
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb3.getId()));
+
+    masterPlan.disconnect(eb3, eb2);
+    assertFalse(masterPlan.isConnected(eb3, eb2));
+    assertFalse(masterPlan.isReverseConnected(eb2, eb3));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
new file mode 100644
index 0000000..c79796b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBNLJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestBNLJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private static int OUTER_TUPLE_NUM = 1000;
+  private static int INNER_TUPLE_NUM = 1000;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", Type.INT4);
+    schema.addColumn("empid", Type.INT4);
+    schema.addColumn("memid", Type.INT4);
+    schema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", schema, employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  // employee (managerId, empId, memId, deptName)
+  // people (empId, fk_memId, name, age)
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people p",
+      "select managerId, e.empId, deptName, e.memId from employee as e " +
+          "inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" };
+
+  @Test
+  public final void testBNLCrossJoin() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof BNLJoinExec);
+
+    int i = 0;
+    exec.init();
+    while (exec.next() != null) {
+      i++;
+    }
+    exec.close();
+    assertEquals(OUTER_TUPLE_NUM * INNER_TUPLE_NUM / 2, i); // expected 10 * 5
+  }
+
+  @Test
+  public final void testBNLInnerJoin() throws IOException, PlanningException {
+    Expr context = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(),
+        context).getRootBlock().getRoot();
+
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
+        merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof BNLJoinExec);
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
+      i += 2;
+    }
+    exec.close();
+    assertEquals(INNER_TUPLE_NUM / 2, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
new file mode 100644
index 0000000..a47bde3
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.Stack;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+
+public class TestBSTIndexExec {
+
+  private TajoConf conf;
+  private Path idxPath;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private AbstractStorageManager sm;
+  private Schema idxSchema;
+  private TupleComparator comp;
+  private BSTIndex.BSTIndexWriter writer;
+  private HashMap<Integer , Integer> randomValues ;
+  private int rndKey = -1;
+  private FileSystem fs;
+  private TableMeta meta;
+  private Path tablePath;
+
+  private Random rnd = new Random(System.currentTimeMillis());
+
+  private TajoTestingCluster util;
+
+  @Before
+  public void setup() throws Exception {
+    this.randomValues = new HashMap<Integer, Integer>();
+    this.conf = new TajoConf();
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+    catalog = util.getMiniCatalogCluster().getCatalog();
+
+    Path workDir = CommonTestingUtil.getTestDir();
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    sm = StorageManagerFactory.getStorageManager(conf, workDir);
+
+    idxPath = new Path(workDir, "test.idx");
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", Type.INT4);
+    schema.addColumn("empid", Type.INT4);
+    schema.addColumn("deptname", Type.TEXT);
+
+    this.idxSchema = new Schema();
+    idxSchema.addColumn("managerid", Type.INT4);
+    SortSpec[] sortKeys = new SortSpec[1];
+    sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
+    this.comp = new TupleComparator(idxSchema, sortKeys);
+
+    this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
+        BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);
+    writer.setLoadNum(100);
+    writer.open();
+    long offset;
+
+    meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv");
+    fs = tablePath.getFileSystem(conf);
+    fs.mkdirs(tablePath.getParent());
+
+    FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
+        tablePath);
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < 10000; i++) {
+      
+      Tuple key = new VTuple(this.idxSchema.size());
+      int rndKey = rnd.nextInt(250);
+      if(this.randomValues.containsKey(rndKey)) {
+        int t = this.randomValues.remove(rndKey) + 1;
+        this.randomValues.put(rndKey, t);
+      } else {
+        this.randomValues.put(rndKey, 1);
+      }
+      
+      key.put(new Datum[] { DatumFactory.createInt4(rndKey) });
+      tuple.put(new Datum[] { DatumFactory.createInt4(rndKey),
+          DatumFactory.createInt4(rnd.nextInt(10)),
+          DatumFactory.createText("dept_" + rnd.nextInt(10)) });
+      offset = appender.getOffset();
+      appender.addTuple(tuple);
+      writer.write(key, offset);
+    }
+    appender.flush();
+    appender.close();
+    writer.close();
+
+    TableDesc desc = new TableDesc(
+        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta,
+        sm.getTablePath("employee"));
+    catalog.createTable(desc);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer(conf);
+  }
+
+  @After
+  public void tearDown() {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public void testEqual() throws Exception {
+    if(conf.getBoolean("tajo.storage.manager.v2", false)) {
+      return;
+    }
+    this.rndKey = rnd.nextInt(250);
+    final String QUERY = "select * from employee where managerId = " + rndKey;
+    
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
+    Expr expr = analyzer.parse(QUERY);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    int tupleCount = this.randomValues.get(rndKey);
+    int counter = 0;
+    exec.init();
+    while (exec.next() != null) {
+      counter ++;
+    }
+    exec.close();
+    assertEquals(tupleCount , counter);
+  }
+
+  private class TmpPlanner extends PhysicalPlannerImpl {
+    public TmpPlanner(TajoConf conf, AbstractStorageManager sm) {
+      super(conf, sm);
+    }
+
+    @Override
+    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
+        throws IOException {
+      Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
+          "Error: There is no table matched to %s", scanNode.getTableName());
+
+      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(),
+          ctx.getTables(scanNode.getTableName()));
+      
+      Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
+
+      return new BSTIndexScanExec(ctx, sm, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
+
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
new file mode 100644
index 0000000..ff3befe
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestExternalSortExec {
+  private TajoConf conf;
+  private TajoTestingCluster util;
+  private final String TEST_PATH = "target/test-data/TestExternalSortExec";
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private final int numTuple = 100000;
+  private Random rnd = new Random(System.currentTimeMillis());
+
+  private TableDesc employee;
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new TajoConf();
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", Type.INT4);
+    schema.addColumn("empid", Type.INT4);
+    schema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    appender.enableStats();
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < numTuple; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(rnd.nextInt(50)),
+          DatumFactory.createInt4(rnd.nextInt(100)),
+          DatumFactory.createText("dept_" + i),
+      });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    System.out.println(appender.getStats().getNumRows() + " rows (" + (appender.getStats().getNumBytes() / 1048576) +
+        " MB)");
+
+    employee = new TableDesc("default.employee", schema, employeeMeta, employeePath);
+    catalog.createTable(employee);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CommonTestingUtil.cleanupTestDir(TEST_PATH);
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, empId from employee order by managerId, empId"
+  };
+
+  @Test
+  public final void testNext() throws IOException, PlanningException {
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // TODO - should be planed with user's optimization hint
+    if (!(proj.getChild() instanceof ExternalSortExec)) {
+      UnaryPhysicalExec sortExec = proj.getChild();
+      SeqScanExec scan = sortExec.getChild();
+
+      ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
+          ((MemSortExec)sortExec).getPlan(), scan);
+      proj.setChild(extSort);
+    }
+
+    Tuple tuple;
+    Tuple preVal = null;
+    Tuple curVal;
+    int cnt = 0;
+    exec.init();
+    long start = System.currentTimeMillis();
+    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+        new SortSpec[]{
+            new SortSpec(new Column("managerid", Type.INT4)),
+            new SortSpec(new Column("empid", Type.INT4))
+        });
+
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      cnt++;
+    }
+    long end = System.currentTimeMillis();
+    assertEquals(numTuple, cnt);
+
+    // for rescan test
+    preVal = null;
+    exec.rescan();
+    cnt = 0;
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      cnt++;
+    }
+    assertEquals(numTuple, cnt);
+    exec.close();
+    System.out.println("Sort Time: " + (end - start) + " msc");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
new file mode 100644
index 0000000..b05688d
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestFullOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestFullOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private static Session session = LocalTajoTestingUtility.createDummySession();
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x= 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
+    appender5.init();
+
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+    catalog.createTable(phone3);
+
+
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 full outer join emp3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the left side
+      "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id",
+      // [3] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testFullOuterHashJoinExec0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+
+  }
+
+
+  @Test
+  public final void testFullOuterHashJoinExec1() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+
+  }
+
+  @Test
+  public final void testFullOuterHashJoinExec2() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+
+  }
+
+
+  @Test
+  public final void testFullOuterHashJoinExec3() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), merged,
+        workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
new file mode 100644
index 0000000..0386179
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestFullOuterMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestFullOuterMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+  private static final Session session = LocalTajoTestingUtility.createDummySession();
+
+  private TableDesc dep3;
+  private TableDesc dep4;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String DEP4_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep4");
+  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    Tuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+
+    //----------------- dep4 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    // 10     | dep_10    | 1010
+    Schema dep4Schema = new Schema();
+    dep4Schema.addColumn("dep_id", Type.INT4);
+    dep4Schema.addColumn("dep_name", Type.TEXT);
+    dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path dep4Path = new Path(testDir, "dep4.csv");
+    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
+    appender4.init();
+    Tuple tuple4 = new VTuple(dep4Schema.size());
+    for (int i = 0; i < 11; i++) {
+      tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender4.addTuple(tuple4);
+    }
+
+    appender4.flush();
+    appender4.close();
+    dep4 = CatalogUtil.newTableDesc(DEP4_NAME, dep4Schema, dep4Meta, dep4Path);
+    catalog.createTable(dep4);
+
+
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    Tuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    Tuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
+    appender5.init();
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+    catalog.createTable(phone3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 full outer join dep3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the right side
+      "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id",
+      // [3] no nulls, right continues after left
+      "select dep4.dep_id, dep_name, emp_id, salary from emp3 full outer join dep4 on dep4.dep_id = emp3.dep_id",
+      // [4] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id",
+      // [5] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from phone3 full outer join emp3 on emp3.emp_id = phone3.emp_id",
+  };
+
+  @Test
+  public final void testFullOuterMergeJoin0() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep3Frags =
+        StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin1() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+  }
+
+  @Test
+  public final void testFullOuterMergeJoin2() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+  }
+
+  @Test
+  public final void testFullOuterMergeJoin3() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep4Frags =
+        StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // if it chose the hash join WITH REVERSED ORDER, convert to merge join exec
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(13, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin4() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags =
+        StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin4");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin5() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[5]);
+    LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags =
+        StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin5");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+
+
+
+}