You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:15:56 UTC

[02/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
new file mode 100644
index 0000000..acd663f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.junit.Test;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestUniformRangePartition {
+  /**
+   * It verify overflow and increment.
+   */
+  @Test
+  public void testIncrement1() {
+    Schema schema = new Schema()
+    .addColumn("l_returnflag", Type.TEXT)
+    .addColumn("l_linestatus", Type.TEXT);
+    Tuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("A"));
+    Tuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("D"));
+    e.put(1, DatumFactory.createText("C"));
+
+    TupleRange expected = new TupleRange(schema, s, e);
+
+    UniformRangePartition partitioner =
+        new UniformRangePartition(schema, expected);
+    assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+
+    String [] result = new String[12];
+    result[0] = "AA";
+    result[1] = "AB";
+    result[2] = "AC";
+    result[3] = "BA";
+    result[4] = "BB";
+    result[5] = "BC";
+    result[6] = "CA";
+    result[7] = "CB";
+    result[8] = "CC";
+    result[9] = "DA";
+    result[10] = "DB";
+    result[11] = "DC";
+
+    Tuple end = partitioner.increment(s, 1, 1);
+    assertEquals("A", end.get(0).asChars());
+    assertEquals("B", end.get(1).asChars());
+    for (int i = 2; i < 11; i++ ) {
+      end = partitioner.increment(end, 1, 1);
+      assertEquals(result[i].charAt(0), end.get(0).asChars().charAt(0));
+      assertEquals(result[i].charAt(1), end.get(1).asChars().charAt(0));
+    }
+  }
+
+  /**
+   * It verify overflow with the number that exceeds the last digit.
+   */
+  @Test
+  public void testIncrement2() {
+    Schema schema = new Schema()
+    .addColumn("l_returnflag", Type.TEXT)
+    .addColumn("l_linestatus", Type.TEXT);
+    Tuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("A"));
+    Tuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("D"));
+    e.put(1, DatumFactory.createText("C"));
+
+    TupleRange expected = new TupleRange(schema, s, e);
+
+    UniformRangePartition partitioner =
+        new UniformRangePartition(schema, expected);
+    assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+
+    String [] result = new String[12];
+    result[0] = "AA";
+    result[1] = "AB";
+    result[2] = "AC";
+    result[3] = "BA";
+    result[4] = "BB";
+    result[5] = "BC";
+    result[6] = "CA";
+    result[7] = "CB";
+    result[8] = "CC";
+    result[9] = "DA";
+    result[10] = "DB";
+    result[11] = "DC";
+
+    Tuple end = partitioner.increment(s, 6, 1);
+    assertEquals("C", end.get(0).asChars());
+    assertEquals("A", end.get(1).asChars());
+    end = partitioner.increment(end, 5, 1);
+    assertEquals("D", end.get(0).asChars());
+    assertEquals("C", end.get(1).asChars());
+  }
+
+  /**
+   * It verify the case where two or more digits are overflow.
+   */
+  @Test
+  public void testIncrement3() {
+    Schema schema = new Schema()
+    .addColumn("l_returnflag", Type.TEXT)
+    .addColumn("l_linestatus", Type.TEXT)
+    .addColumn("final", Type.TEXT);
+
+    Tuple s = new VTuple(3);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("A"));
+    s.put(2, DatumFactory.createText("A"));
+    Tuple e = new VTuple(3);
+    e.put(0, DatumFactory.createText("D")); //  4
+    e.put(1, DatumFactory.createText("B")); //  2
+    e.put(2, DatumFactory.createText("C")); // x3 = 24
+
+    TupleRange expected = new TupleRange(schema, s, e);
+
+    UniformRangePartition partitioner =
+        new UniformRangePartition(schema, expected);
+    assertEquals(24, TupleUtil.computeCardinality(schema, expected));
+
+    Tuple overflowBefore = partitioner.increment(s, 5, 2);
+    assertEquals("A", overflowBefore.get(0).asChars());
+    assertEquals("B", overflowBefore.get(1).asChars());
+    assertEquals("C", overflowBefore.get(2).asChars());
+    Tuple overflowed = partitioner.increment(overflowBefore, 1, 2);
+    assertEquals("B", overflowed.get(0).asChars());
+    assertEquals("A", overflowed.get(1).asChars());
+    assertEquals("A", overflowed.get(2).asChars());
+  }
+
+  @Test
+  public void testIncrement4() {
+    Schema schema = new Schema()
+    .addColumn("l_orderkey", Type.INT8)
+    .addColumn("l_linenumber", Type.INT8);
+    Tuple s = new VTuple(2);
+    s.put(0, DatumFactory.createInt8(10));
+    s.put(1, DatumFactory.createInt8(20));
+    Tuple e = new VTuple(2);
+    e.put(0, DatumFactory.createInt8(19));
+    e.put(1, DatumFactory.createInt8(39));
+
+    TupleRange expected = new TupleRange(schema, s, e);
+
+    UniformRangePartition partitioner =
+        new UniformRangePartition(schema, expected);
+    assertEquals(200, partitioner.getTotalCardinality().longValue());
+
+    Tuple range2 = partitioner.increment(s, 100, 1);
+    assertEquals(15, range2.get(0).asInt4());
+    assertEquals(20, range2.get(1).asInt4());
+    Tuple range3 = partitioner.increment(range2, 99, 1);
+    assertEquals(19, range3.get(0).asInt4());
+    assertEquals(39, range3.get(1).asInt4());
+  }
+
+  @Test public void testIncrement5() {
+    Schema schema = new Schema()
+    .addColumn("l_orderkey", Type.INT8)
+    .addColumn("l_linenumber", Type.INT8)
+    .addColumn("final", Type.INT8);
+    Tuple s = new VTuple(3);
+    s.put(0, DatumFactory.createInt8(1));
+    s.put(1, DatumFactory.createInt8(1));
+    s.put(2, DatumFactory.createInt8(1));
+    Tuple e = new VTuple(3);
+    e.put(0, DatumFactory.createInt8(4)); // 4
+    e.put(1, DatumFactory.createInt8(2)); // 2
+    e.put(2, DatumFactory.createInt8(3)); //x3 = 24
+
+    TupleRange expected = new TupleRange(schema, s, e);
+
+    UniformRangePartition partitioner
+        = new UniformRangePartition(schema, expected);
+    assertEquals(24, partitioner.getTotalCardinality().longValue());
+
+    Tuple beforeOverflow = partitioner.increment(s, 5, 2);
+    assertEquals(1, beforeOverflow.get(0).asInt8());
+    assertEquals(2, beforeOverflow.get(1).asInt8());
+    assertEquals(3, beforeOverflow.get(2).asInt8());
+    Tuple overflow = partitioner.increment(beforeOverflow, 1, 2);
+    assertEquals(2, overflow.get(0).asInt8());
+    assertEquals(1, overflow.get(1).asInt8());
+    assertEquals(1, overflow.get(2).asInt8());
+  }
+
+  @Test
+  public void testIncrement6() {
+    Schema schema = new Schema()
+      .addColumn("l_orderkey", Type.FLOAT8)
+      .addColumn("l_linenumber", Type.FLOAT8)
+      .addColumn("final", Type.FLOAT8);
+    Tuple s = new VTuple(3);
+    s.put(0, DatumFactory.createFloat8(1.1d));
+    s.put(1, DatumFactory.createFloat8(1.1d));
+    s.put(2, DatumFactory.createFloat8(1.1d));
+    Tuple e = new VTuple(3);
+    e.put(0, DatumFactory.createFloat8(4.1d)); // 4
+    e.put(1, DatumFactory.createFloat8(2.1d)); // 2
+    e.put(2, DatumFactory.createFloat8(3.1d)); //x3 = 24
+
+    TupleRange expected = new TupleRange(schema, s, e);
+
+    UniformRangePartition partitioner =
+        new UniformRangePartition(schema, expected);
+    assertEquals(24, partitioner.getTotalCardinality().longValue());
+
+    Tuple beforeOverflow = partitioner.increment(s, 5, 2);
+    assertTrue(1.1d == beforeOverflow.get(0).asFloat8());
+    assertTrue(2.1d == beforeOverflow.get(1).asFloat8());
+    assertTrue(3.1d == beforeOverflow.get(2).asFloat8());
+    Tuple overflow = partitioner.increment(beforeOverflow, 1, 2);
+    assertTrue(2.1d == overflow.get(0).asFloat8());
+    assertTrue(1.1d == overflow.get(1).asFloat8());
+    assertTrue(1.1d == overflow.get(2).asFloat8());
+  }
+
+  @Test
+  public void testPartition() {
+    Schema schema = new Schema();
+    schema.addColumn("l_returnflag", Type.TEXT);
+    schema.addColumn("l_linestatus", Type.TEXT);
+    Tuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("F"));
+    Tuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("R"));
+    e.put(1, DatumFactory.createText("O"));
+    TupleRange expected = new TupleRange(schema, s, e);
+    RangePartitionAlgorithm partitioner
+        = new UniformRangePartition(schema, expected, true);
+    TupleRange [] ranges = partitioner.partition(31);
+
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev == null) {
+        prev = r;
+      } else {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+    }
+  }
+
+  @Test
+  public void testPartitionForOnePartNum() {
+    Schema schema = new Schema()
+      .addColumn("l_returnflag", Type.TEXT)
+      .addColumn("l_linestatus", Type.TEXT);
+    Tuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("F"));
+    Tuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("R"));
+    e.put(1, DatumFactory.createText("O"));
+    TupleRange expected = new TupleRange(schema, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(schema, expected, true);
+    TupleRange [] ranges = partitioner.partition(1);
+
+    assertEquals(expected, ranges[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
new file mode 100644
index 0000000..0418221
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.TestEvalTree.TestSum;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.GlobalPlanner;
+import org.apache.tajo.storage.*;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestGlobalQueryOptimizer {
+  private static TajoTestingCluster util;
+  private static TajoConf conf;
+  private static CatalogService catalog;
+  private static GlobalPlanner planner;
+  private static Schema schema;
+  private static QueryAnalyzer analyzer;
+  private static LogicalPlanner logicalPlanner;
+  private static QueryId queryId;
+  private static GlobalOptimizer optimizer;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+    int i, j;
+
+    schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    schema.addColumn("salary", Type.INT4);
+
+    TableMeta meta;
+
+    conf = new TajoConf(util.getConfiguration());
+    catalog = util.getMiniCatalogCluster().getCatalog();
+    StorageManager sm = new StorageManager(util.getConfiguration());
+    FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
+        CatalogUtil.newDataTypesWithoutLen(Type.INT4));
+    catalog.registerFunction(funcDesc);
+    FileSystem fs = sm.getFileSystem();
+
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+
+    planner = new GlobalPlanner(conf, catalog, new StorageManager(conf),
+        dispatcher.getEventHandler());
+    analyzer = new QueryAnalyzer(catalog);
+    logicalPlanner = new LogicalPlanner(catalog);
+
+    int tbNum = 2;
+    int tupleNum;
+    Appender appender;
+    Tuple t = new VTuple(4);
+    t.put(new Datum[] {
+        DatumFactory.createInt4(1), DatumFactory.createInt4(32),
+        DatumFactory.createText("h"), DatumFactory.createInt4(10)});
+
+    for (i = 0; i < tbNum; i++) {
+      meta = CatalogUtil.newTableMeta((Schema) schema.clone(), StoreType.CSV);
+      meta.putOption(CSVFile.DELIMITER, ",");
+
+      Path dataRoot = sm.getBaseDir();
+      Path tablePath = StorageUtil.concatPath(dataRoot, "table"+i, "file.csv");
+      if (fs.exists(tablePath.getParent())) {
+        fs.delete(tablePath.getParent(), true);
+      }
+      fs.mkdirs(tablePath.getParent());
+      appender = StorageManager.getAppender(conf, meta, tablePath);
+      appender.init();
+      tupleNum = 100;
+      for (j = 0; j < tupleNum; j++) {
+        appender.addTuple(t);
+      }
+      appender.close();
+
+      TableDesc desc = CatalogUtil
+          .newTableDesc("table" + i, (TableMeta) meta.clone(), sm.getTablePath("table" + i));
+      catalog.addTable(desc);
+    }
+
+    QueryIdFactory.reset();
+    queryId = QueryIdFactory.newQueryId();
+    optimizer = new GlobalOptimizer();
+  }
+  
+  @AfterClass
+  public static void terminate() throws IOException {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public void testReduceLogicalQueryUnitSteps() throws IOException {
+    PlanningContext context = analyzer.parse(
+        "select table0.age,table0.salary,table1.salary from table0,table1 where table0.salary = table1.salary order by table0.age");
+    LogicalNode plan = logicalPlanner.createPlan(context);
+    plan = LogicalOptimizer.optimize(context, plan);
+
+    MasterPlan globalPlan = planner.build(queryId,
+        (LogicalRootNode) plan);
+    globalPlan = optimizer.optimize(globalPlan);
+    
+    ExecutionBlock unit = globalPlan.getRoot();
+    StoreTableNode store = unit.getStoreTableNode();
+    assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
+    ProjectionNode proj = (ProjectionNode) store.getSubNode();
+    assertEquals(ExprType.SORT, proj.getSubNode().getType());
+    SortNode sort = (SortNode) proj.getSubNode();
+    assertEquals(ExprType.SCAN, sort.getSubNode().getType());
+    ScanNode scan = (ScanNode) sort.getSubNode();
+    
+    assertTrue(unit.hasChildBlock());
+    unit = unit.getChildBlock(scan);
+    store = unit.getStoreTableNode();
+    assertEquals(ExprType.SORT, store.getSubNode().getType());
+    sort = (SortNode) store.getSubNode();
+    assertEquals(ExprType.JOIN, sort.getSubNode().getType());
+    
+    assertTrue(unit.hasChildBlock());
+    for (ScanNode prevscan : unit.getScanNodes()) {
+      ExecutionBlock prev = unit.getChildBlock(prevscan);
+      store = prev.getStoreTableNode();
+      assertEquals(ExprType.SCAN, store.getSubNode().getType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestReceiveNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestReceiveNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestReceiveNode.java
new file mode 100644
index 0000000..848b0cd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestReceiveNode.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.extended;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.apache.tajo.engine.planner.logical.ExprType;
+
+import java.net.URI;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestReceiveNode {
+  @Test
+  public final void testReceiveNode() throws CloneNotSupportedException {
+    ReceiveNode rec = new ReceiveNode(PipeType.PULL, RepartitionType.HASH);
+    
+    URI uri1 = URI.create("http://192.168.0.1:2190/?part=0");
+    URI uri2 = URI.create("http://192.168.0.2:2190/?part=1");
+    URI uri3 = URI.create("http://192.168.0.3:2190/?part=2");
+    URI uri4 = URI.create("http://192.168.0.4:2190/?part=3");
+    List<URI> set1 = Lists.newArrayList(uri1, uri2);
+    List<URI> set2 = Lists.newArrayList(uri3, uri4);
+    
+    rec.addData("test1", set1.get(0));
+    rec.addData("test1", set1.get(1));
+    rec.addData("test2", set2.get(0));
+    rec.addData("test2", set2.get(1));
+    
+    assertEquals(ExprType.RECEIVE, rec.getType());
+    assertEquals(PipeType.PULL, rec.getPipeType());
+    assertEquals(RepartitionType.HASH, rec.getRepartitionType());    
+    assertEquals(set1, Lists.newArrayList(rec.getSrcURIs("test1")));
+    assertEquals(set2, Lists.newArrayList(rec.getSrcURIs("test2")));
+    
+    ReceiveNode rec2 = (ReceiveNode) rec.clone();
+    assertEquals(ExprType.RECEIVE, rec2.getType());
+    assertEquals(PipeType.PULL, rec2.getPipeType());
+    assertEquals(RepartitionType.HASH, rec2.getRepartitionType());    
+    assertEquals(set1, Lists.newArrayList(rec2.getSrcURIs("test1")));
+    assertEquals(set2, Lists.newArrayList(rec2.getSrcURIs("test2")));
+    
+    assertEquals(rec, rec2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestSendNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestSendNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestSendNode.java
new file mode 100644
index 0000000..faa4289
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/extended/TestSendNode.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.extended;
+
+import org.junit.Test;
+import org.apache.tajo.engine.planner.logical.ExprType;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSendNode {
+  @Test
+  public final void testSendNode() throws CloneNotSupportedException {
+    SendNode send = new SendNode(PipeType.PULL, RepartitionType.HASH);
+    send.putDestURI(0, URI.create("http://localhost:2190"));
+    send.putDestURI(1, URI.create("http://localhost:2191"));
+    
+    assertEquals(ExprType.SEND, send.getType());
+    assertEquals(PipeType.PULL, send.getPipeType());
+    assertEquals(RepartitionType.HASH, send.getRepartitionType());
+    assertEquals(URI.create("http://localhost:2190"), send.getDestURI(0));
+    assertEquals(URI.create("http://localhost:2191"), send.getDestURI(1));
+    
+    SendNode send2 = (SendNode) send.clone();
+    assertEquals(ExprType.SEND, send2.getType());
+    assertEquals(PipeType.PULL, send2.getPipeType());
+    assertEquals(RepartitionType.HASH, send2.getRepartitionType());
+    assertEquals(URI.create("http://localhost:2190"), send2.getDestURI(0));
+    assertEquals(URI.create("http://localhost:2191"), send2.getDestURI(1));
+    
+    assertEquals(send, send2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
new file mode 100644
index 0000000..c04f4ba
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBNLJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestNLJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private QueryAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private StorageManager sm;
+  private Path testDir;
+
+  private static int OUTER_TUPLE_NUM = 1000;
+  private static int INNER_TUPLE_NUM = 1000;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManager.get(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerId", Type.INT4);
+    schema.addColumn("empId", Type.INT4);
+    schema.addColumn("memId", Type.INT4);
+    schema.addColumn("deptName", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    catalog.addTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empId", Type.INT4);
+    peopleSchema.addColumn("fk_memId", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    catalog.addTable(people);
+    analyzer = new QueryAnalyzer(catalog);
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people",
+      "select managerId, e.empId, deptName, e.memId from employee as e " +
+          "inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" };
+
+  @Test
+  public final void testCrossJoin() throws IOException {
+    Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCrossJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), merged, workDir);
+    PlanningContext context = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(context);
+    //LogicalOptimizer.optimize(ctx, plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    /*ProjectionExec proj = (ProjectionExec) exec;
+    NLJoinExec nlJoin = (NLJoinExec) proj.getChild();
+    SeqScanExec scanOuter = (SeqScanExec) nlJoin.getOuter();
+    SeqScanExec scanInner = (SeqScanExec) nlJoin.getInner();
+
+    BNLJoinExec bnl = new BNLJoinExec(ctx, nlJoin.getJoinNode(), scanOuter,
+        scanInner);
+    proj.setsubOp(bnl);*/
+
+    int i = 0;
+    exec.init();
+    while (exec.next() != null) {
+      i++;
+    }
+    exec.close();
+    assertEquals(OUTER_TUPLE_NUM * INNER_TUPLE_NUM / 2, i); // expected 10 * 5
+  }
+
+  @Test
+  public final void testInnerJoin() throws IOException {
+    Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
+    TaskAttemptContext ctx =
+        new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+            merged, workDir);
+    PlanningContext context = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(context);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    SeqScanExec scanOuter = null;
+    SeqScanExec scanInner = null;
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    JoinNode joinNode = null;
+    if (proj.getChild() instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) proj.getChild();
+      ExternalSortExec sortOut = (ExternalSortExec) join.getOuterChild();
+      ExternalSortExec sortIn = (ExternalSortExec) join.getInnerChild();
+      scanOuter = (SeqScanExec) sortOut.getChild();
+      scanInner = (SeqScanExec) sortIn.getChild();
+      joinNode = join.getJoinNode();
+    } else if (proj.getChild() instanceof HashJoinExec) {
+      HashJoinExec join = (HashJoinExec) proj.getChild();
+      scanOuter = (SeqScanExec) join.getOuterChild();
+      scanInner = (SeqScanExec) join.getInnerChild();
+      joinNode = join.getPlan();
+    }
+
+    BNLJoinExec bnl = new BNLJoinExec(ctx, joinNode, scanOuter,
+        scanInner);
+    proj.setChild(bnl);
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt(0).asInt4());
+      assertTrue(i == tuple.getInt(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      i += 2;
+    }
+    exec.close();
+    assertEquals(INNER_TUPLE_NUM / 2, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
new file mode 100644
index 0000000..6b9f16c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestBSTIndexExec {
+
+  private TajoConf conf;
+  private Path idxPath;
+  private CatalogService catalog;
+  private QueryAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private StorageManager sm;
+  private Schema idxSchema;
+  private TupleComparator comp;
+  private BSTIndex.BSTIndexWriter writer;
+  private HashMap<Integer , Integer> randomValues ;
+  private int rndKey = -1;
+  private FileSystem fs;
+  private TableMeta meta;
+  private Path tablePath;
+
+  private Random rnd = new Random(System.currentTimeMillis());
+
+  private TajoTestingCluster util;
+
+  @Before
+  public void setup() throws Exception {
+    this.randomValues = new HashMap<Integer, Integer>();
+    this.conf = new TajoConf();
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+    catalog = util.getMiniCatalogCluster().getCatalog();
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
+    sm = StorageManager.get(conf, workDir);
+
+    idxPath = new Path(workDir, "test.idx");
+
+    Schema schema = new Schema();
+    schema.addColumn("managerId", Type.INT4);
+    schema.addColumn("empId", Type.INT4);
+    schema.addColumn("deptName", Type.TEXT);
+
+    this.idxSchema = new Schema();
+    idxSchema.addColumn("managerId", Type.INT4);
+    SortSpec[] sortKeys = new SortSpec[1];
+    sortKeys[0] = new SortSpec(idxSchema.getColumn("managerId"), true, false);
+    this.comp = new TupleComparator(idxSchema, sortKeys);
+
+    this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
+        BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);
+    writer.setLoadNum(100);
+    writer.open();
+    long offset;
+
+    meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv");
+    fs = tablePath.getFileSystem(conf);
+    fs.mkdirs(tablePath.getParent());
+
+    FileAppender appender = (FileAppender)StorageManager.getAppender(conf, meta, tablePath);
+    appender.init();
+    Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
+    for (int i = 0; i < 10000; i++) {
+      
+      Tuple key = new VTuple(this.idxSchema.getColumnNum());
+      int rndKey = rnd.nextInt(250);
+      if(this.randomValues.containsKey(rndKey)) {
+        int t = this.randomValues.remove(rndKey) + 1;
+        this.randomValues.put(rndKey, t);
+      } else {
+        this.randomValues.put(rndKey, 1);
+      }
+      
+      key.put(new Datum[] { DatumFactory.createInt4(rndKey) });
+      tuple.put(new Datum[] { DatumFactory.createInt4(rndKey),
+          DatumFactory.createInt4(rnd.nextInt(10)),
+          DatumFactory.createText("dept_" + rnd.nextInt(10)) });
+      offset = appender.getOffset();
+      appender.addTuple(tuple);
+      writer.write(key, offset);
+    }
+    appender.flush();
+    appender.close();
+    writer.close();
+
+    TableDesc desc = new TableDescImpl("employee", meta,
+        sm.getTablePath("employee"));
+    catalog.addTable(desc);
+
+    analyzer = new QueryAnalyzer(catalog);
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public void testEqual() throws Exception {
+    
+    this.rndKey = rnd.nextInt(250);
+    final String QUERY = "select * from employee where managerId = " + rndKey;
+    
+    Fragment[] frags = StorageManager.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+    PlanningContext context = analyzer.parse(QUERY);
+    LogicalNode plan = planner.createPlan(context);
+
+    plan =  LogicalOptimizer.optimize(context, plan);
+
+    TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    int tupleCount = this.randomValues.get(rndKey);
+    int counter = 0;
+
+    while (exec.next() != null) {
+      counter ++;
+    }
+    assertEquals(tupleCount , counter);
+  }
+
+  @After
+  public void shutdown() {
+
+  }
+
+  private class TmpPlanner extends PhysicalPlannerImpl {
+    public TmpPlanner(TajoConf conf, StorageManager sm) {
+      super(conf, sm);
+    }
+
+    @Override
+    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
+        throws IOException {
+      Preconditions.checkNotNull(ctx.getTable(scanNode.getTableId()),
+          "Error: There is no table matched to %s", scanNode.getTableId());
+
+      Fragment[] fragments = ctx.getTables(scanNode.getTableId());
+      
+      Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
+
+      return new BSTIndexScanExec(ctx, sm, scanNode, fragments[0], idxPath,
+          idxSchema, comp , datum);
+
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
new file mode 100644
index 0000000..85d9380
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestExternalSortExec {
+  private TajoConf conf;
+  private TajoTestingCluster util;
+  private final String TEST_PATH = "target/test-data/TestExternalSortExec";
+  private CatalogService catalog;
+  private QueryAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private StorageManager sm;
+  private Path testDir;
+
+
+  private final int numTuple = 1000000;
+  private Random rnd = new Random(System.currentTimeMillis());
+
+
+  private TableDesc employee;
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new TajoConf();
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    sm = StorageManager.get(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerId", Type.INT4);
+    schema.addColumn("empId", Type.INT4);
+    schema.addColumn("deptName", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    appender.enableStats();
+    appender.init();
+    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    for (int i = 0; i < numTuple; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(rnd.nextInt(50)),
+          DatumFactory.createInt4(rnd.nextInt(100)),
+          DatumFactory.createText("dept_" + 123) });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    System.out.println("Total Rows: " + appender.getStats().getNumRows());
+
+    employee = new TableDescImpl("employee", employeeMeta, employeePath);
+    catalog.addTable(employee);
+    analyzer = new QueryAnalyzer(catalog);
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, empId, deptName from employee order by managerId, empId desc"
+  };
+
+  @Test
+  public final void testNext() throws IOException {
+    Fragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+    PlanningContext context = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(context);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // TODO - should be planed with user's optimization hint
+    if (!(proj.getChild() instanceof ExternalSortExec)) {
+      UnaryPhysicalExec sortExec = (UnaryPhysicalExec) proj.getChild();
+      SeqScanExec scan = (SeqScanExec)sortExec.getChild();
+
+      ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
+          ((MemSortExec)sortExec).getPlan(), scan);
+      proj.setChild(extSort);
+    }
+
+    Tuple tuple;
+    Datum preVal = null;
+    Datum curVal;
+    int cnt = 0;
+    exec.init();
+    long start = System.currentTimeMillis();
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple.get(0);
+      if (preVal != null) {
+        assertTrue(preVal.lessThanEqual(curVal).asBool());
+      }
+      preVal = curVal;
+      cnt++;
+    }
+    long end = System.currentTimeMillis();
+    exec.close();
+    assertEquals(numTuple, cnt);
+
+    // for rescan test
+    preVal = null;
+    exec.rescan();
+    cnt = 0;
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple.get(0);
+      if (preVal != null) {
+        assertTrue(preVal.lessThanEqual(curVal).asBool());
+      }
+      preVal = curVal;
+      cnt++;
+    }
+    assertEquals(numTuple, cnt);
+
+    System.out.println("Sort Time: " + (end - start) + " msc");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
new file mode 100644
index 0000000..65af200
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private QueryAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private StorageManager sm;
+  private Path testDir;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManager.get(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerId", Type.INT4);
+    employeeSchema.addColumn("empId", Type.INT4);
+    employeeSchema.addColumn("memId", Type.INT4);
+    employeeSchema.addColumn("deptName", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
+        StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    catalog.addTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empId", Type.INT4);
+    peopleSchema.addColumn("fk_memId", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    catalog.addTable(people);
+    analyzer = new QueryAnalyzer(catalog);
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e inner join " +
+          "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+  };
+
+  @Test
+  public final void testInnerJoin() throws IOException {
+    Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), merged, workDir);
+    PlanningContext context = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(context);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    if (proj.getChild() instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) proj.getChild();
+      ExternalSortExec sortout = (ExternalSortExec) join.getOuterChild();
+      ExternalSortExec sortin = (ExternalSortExec) join.getInnerChild();
+      SeqScanExec scanout = (SeqScanExec) sortout.getChild();
+      SeqScanExec scanin = (SeqScanExec) sortin.getChild();
+
+      HashJoinExec hashjoin = new HashJoinExec(ctx, join.getJoinNode(), scanout, scanin);
+      proj.setChild(hashjoin);
+
+      exec = proj;
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt(0).asInt4());
+      assertTrue(i == tuple.getInt(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+      assertTrue(10 + i == tuple.getInt(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(10 / 2, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
new file mode 100644
index 0000000..f0d846c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHashPartitioner {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testGetPartition() {   
+    Tuple tuple1 = new VTuple(3);    
+    tuple1.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(3)
+    });
+    Tuple tuple2 = new VTuple(3);    
+    tuple2.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(4)
+    });
+    Tuple tuple3 = new VTuple(3);    
+    tuple3.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(5)
+    });
+    Tuple tuple4 = new VTuple(3);    
+    tuple4.put(new Datum[] {
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(3)
+    });
+    Tuple tuple5 = new VTuple(3);    
+    tuple5.put(new Datum[] {
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(4)
+    });
+    
+    int [] partKeys = {0,1};
+    Partitioner p = new HashPartitioner(partKeys, 2);
+    
+    int part1 = p.getPartition(tuple1);
+    assertEquals(part1, p.getPartition(tuple2));
+    assertEquals(part1, p.getPartition(tuple3));
+    
+    int part2 = p.getPartition(tuple4);
+    assertEquals(part2, p.getPartition(tuple5));    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
new file mode 100644
index 0000000..9201838
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private QueryAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private StorageManager sm;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    FileSystem fs = testDir.getFileSystem(conf);
+    sm = StorageManager.get(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerId", Type.INT4);
+    employeeSchema.addColumn("empId", Type.INT4);
+    employeeSchema.addColumn("memId", Type.INT4);
+    employeeSchema.addColumn("deptName", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
+        StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+    for (int i = 11; i < 20; i+=2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("employee", employeeMeta,
+        employeePath);
+    catalog.addTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empId", Type.INT4);
+    peopleSchema.addColumn("fk_memId", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+    for (int i = 10; i < 20; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    catalog.addTable(people);
+    analyzer = new QueryAnalyzer(catalog);
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e inner join " +
+          "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+  };
+
+  @Test
+  public final void testInnerJoin() throws IOException {
+    Fragment[] empFrags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = sm.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), merged, workDir);
+    PlanningContext context = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(context);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // TODO - should be planed with user's optimization hint
+    if (!(proj.getChild() instanceof MergeJoinExec)) {
+      BinaryPhysicalExec nestedLoopJoin = (BinaryPhysicalExec) proj.getChild();
+      SeqScanExec outerScan = (SeqScanExec) nestedLoopJoin.getOuterChild();
+      SeqScanExec innerScan = (SeqScanExec) nestedLoopJoin.getInnerChild();
+
+      SeqScanExec tmp;
+      if (!outerScan.getTableName().equals("employee")) {
+        tmp = outerScan;
+        outerScan = innerScan;
+        innerScan = tmp;
+      }
+
+      SortSpec[] outerSortKeys = new SortSpec[2];
+      SortSpec[] innerSortKeys = new SortSpec[2];
+
+      Schema employeeSchema = catalog.getTableDesc("employee").getMeta()
+          .getSchema();
+      outerSortKeys[0] = new SortSpec(
+          employeeSchema.getColumnByName("empId"));
+      outerSortKeys[1] = new SortSpec(
+          employeeSchema.getColumnByName("memId"));
+      SortNode outerSort = new SortNode(outerSortKeys);
+      outerSort.setInSchema(outerScan.getSchema());
+      outerSort.setOutSchema(outerScan.getSchema());
+
+      Schema peopleSchema = catalog.getTableDesc("people").getMeta().getSchema();
+      innerSortKeys[0] = new SortSpec(
+          peopleSchema.getColumnByName("empId"));
+      innerSortKeys[1] = new SortSpec(
+          peopleSchema.getColumnByName("fk_memid"));
+      SortNode innerSort = new SortNode(innerSortKeys);
+      innerSort.setInSchema(innerScan.getSchema());
+      innerSort.setOutSchema(innerScan.getSchema());
+
+      MemSortExec outerSortExec = new MemSortExec(ctx, outerSort, outerScan);
+      MemSortExec innerSortExec = new MemSortExec(ctx, innerSort, innerScan);
+
+      MergeJoinExec mergeJoin = new MergeJoinExec(ctx,
+          ((HashJoinExec)nestedLoopJoin).getPlan(), outerSortExec, innerSortExec,
+          outerSortKeys, innerSortKeys);
+      proj.setChild(mergeJoin);
+      exec = proj;
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt(0).asInt4());
+      assertTrue(i == tuple.getInt(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+      assertTrue(10 + i == tuple.getInt(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(10, count); // expected 10 * 5
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
new file mode 100644
index 0000000..dde6aa5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestNLJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestNLJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private QueryAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private StorageManager sm;
+  private Path testDir;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManager.get(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerId", Type.INT4);
+    schema.addColumn("empId", Type.INT4);
+    schema.addColumn("memId", Type.INT4);
+    schema.addColumn("deptName", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    for (int i = 0; i < 50; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i)});
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("employee", employeeMeta,
+        employeePath);
+    catalog.addTable(employee);
+    
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empId", Type.INT4);
+    peopleSchema.addColumn("fk_memId", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    for (int i = 1; i < 50; i += 2) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i)});
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+    
+    people = CatalogUtil.newTableDesc("people", peopleMeta,
+        peoplePath);
+    catalog.addTable(people);
+    analyzer = new QueryAnalyzer(catalog);
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+  
+  String[] QUERIES = {
+    "select managerId, e.empId, deptName, e.memId from employee as e, people",
+    "select managerId, e.empId, deptName, e.memId from employee as e inner join people as p on " +
+        "e.empId = p.empId and e.memId = p.fk_memId"
+  };
+  
+  @Test
+  public final void testCrossJoin() throws IOException {
+    Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    
+    Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCrossJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), merged, workDir);
+    PlanningContext context = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(context);
+    //LogicalOptimizer.optimize(ctx, plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    int i = 0;
+    exec.init();
+    while (exec.next() != null) {
+      i++;
+    }
+    exec.close();
+    assertEquals(50*50/2, i); // expected 10 * 5
+  }
+
+  @Test
+  public final void testInnerJoin() throws IOException {
+    Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+    
+    Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), merged, workDir);
+    PlanningContext context =  analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(context);
+    //LogicalOptimizer.optimize(ctx, plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt(0).asInt4());
+      assertTrue(i == tuple.getInt(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      i += 2;
+    }
+    exec.close();
+    assertEquals(50 / 2, count);
+  }
+}