You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/19 04:08:10 UTC

[45/47] tajo git commit: TAJO-1577: Add test cases to verify join plans. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4b1b7799/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
new file mode 100644
index 0000000..a8e2a3b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.ResultSet;
+
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestInnerJoinWithSubQuery extends TestJoinQuery {
+
+  public TestInnerJoinWithSubQuery(String joinOption) throws Exception {
+    super(joinOption);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestJoinQuery.setup();
+  }
+
+  @AfterClass
+  public static void classTearDown() throws ServiceException {
+    TestJoinQuery.classTearDown();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithMultipleJoinQual2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithMultipleJoinQual3() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithMultipleJoinQual4() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  public final void testJoinWithJson2() throws Exception {
+    /*
+    select t.n_nationkey, t.n_name, t.n_regionkey, t.n_comment, ps.ps_availqty, s.s_suppkey
+    from (
+      select n_nationkey, n_name, n_regionkey, n_comment
+      from nation n
+      join region r on (n.n_regionkey = r.r_regionkey)
+    ) t
+    join supplier s on (s.s_nationkey = t.n_nationkey)
+    join partsupp ps on (s.s_suppkey = ps.ps_suppkey)
+    where t.n_name in ('ARGENTINA','ETHIOPIA', 'MOROCCO');
+     */
+    ResultSet res = executeJsonQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition5() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition6() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition7() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testBroadcastSubquery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testBroadcastSubquery2() throws Exception {
+    runSimpleTests();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4b1b7799/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
deleted file mode 100644
index 8387abd..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ /dev/null
@@ -1,850 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.Int4Datum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.jdbc.FetchResultSet;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.worker.TajoWorker;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.File;
-import java.io.OutputStream;
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-import static org.junit.Assert.*;
-
-@Category(IntegrationTest.class)
-public class TestJoinBroadcast extends QueryTestCaseBase {
-  public TestJoinBroadcast() throws Exception {
-    super(TajoConstants.DEFAULT_DATABASE_NAME);
-    testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true");
-    testingCluster.setAllTajoDaemonConfValue(
-        TajoConf.ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "" + (5 * 1024));
-
-    executeDDL("create_lineitem_large_ddl.sql", "lineitem_large");
-    executeDDL("create_customer_large_ddl.sql", "customer_large");
-    executeDDL("create_orders_large_ddl.sql", "orders_large");
-  }
-
-  @Test
-  public final void testCrossJoin() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testWhereClauseJoin1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testWhereClauseJoin2() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testWhereClauseJoin3() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testWhereClauseJoin4() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testWhereClauseJoin5() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testWhereClauseJoin6() throws Exception {
-    ResultSet res = executeQuery();
-    System.out.println(resultSetToString(res));
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testTPCHQ2Join() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoin1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoin2() throws Exception {
-    // large, large, small, small
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoin3() throws Exception {
-    // large, large, small, large, small, small
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoinWithConstantExpr1() throws Exception {
-    // outer join with constant projections
-    //
-    // select c_custkey, orders.o_orderkey, 'val' as val from customer
-    // left outer join orders on c_custkey = o_orderkey;
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoinWithConstantExpr2() throws Exception {
-    // outer join with constant projections
-    //
-    // select c_custkey, o.o_orderkey, 'val' as val from customer left outer join
-    // (select * from orders) o on c_custkey = o.o_orderkey
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoinWithConstantExpr3() throws Exception {
-    // outer join with constant projections
-    //
-    // select a.c_custkey, 123::INT8 as const_val, b.min_name from customer a
-    // left outer join ( select c_custkey, min(c_name) as min_name from customer group by c_custkey) b
-    // on a.c_custkey = b.c_custkey;
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testRightOuterJoin1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testFullOuterJoin1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testJoinCoReferredEvals1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testJoinCoReferredEvalsWithSameExprs1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testJoinCoReferredEvalsWithSameExprs2() throws Exception {
-    // including grouping operator
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testCrossJoinAndCaseWhen() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testCrossJoinWithAsterisk1() throws Exception {
-    // select region.*, customer.* from region, customer;
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testCrossJoinWithAsterisk2() throws Exception {
-    // select region.*, customer.* from customer, region;
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testCrossJoinWithAsterisk3() throws Exception {
-    // select * from customer, region
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public void testCrossJoinWithAsterisk4() throws Exception {
-    // select length(r_regionkey), *, c_custkey*10 from customer, region
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testInnerJoinWithEmptyTable() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoinWithEmptyTable1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoinWithEmptyTable2() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoinWithEmptyTable3() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testLeftOuterJoinWithEmptyTable4() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testRightOuterJoinWithEmptyTable1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testFullOuterJoinWithEmptyTable1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testCrossJoinWithEmptyTable1() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testJoinOnMultipleDatabases() throws Exception {
-    executeString("CREATE DATABASE JOINS");
-    assertDatabaseExists("joins");
-    executeString("CREATE TABLE JOINS.part_ as SELECT * FROM part");
-    assertTableExists("joins.part_");
-    executeString("CREATE TABLE JOINS.supplier_ as SELECT * FROM supplier");
-    assertTableExists("joins.supplier_");
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-    executeString("DROP TABLE JOINS.part_ PURGE");
-    executeString("DROP TABLE JOINS.supplier_ PURGE");
-    executeString("DROP DATABASE JOINS");
-  }
-
-  private MasterPlan getQueryPlan(QueryId queryId) {
-    for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) {
-      QueryMasterTask queryMasterTask = eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true);
-      if (queryMasterTask != null) {
-        return queryMasterTask.getQuery().getPlan();
-      }
-    }
-
-    fail("Can't find query from workers" + queryId);
-    return null;
-  }
-
-  @Test
-  public final void testBroadcastBasicJoin() throws Exception {
-    ResultSet res = executeQuery();
-    assertEquals(FetchResultSet.class, res.getClass());
-    FetchResultSet resultSet = (FetchResultSet)res;
-    assertResultSet(res);
-    cleanupQuery(res);
-
-    MasterPlan plan = getQueryPlan(resultSet.getQueryId());
-    ExecutionBlock rootEB = plan.getRoot();
-
-    /*
-    |-eb_1395998037360_0001_000006
-       |-eb_1395998037360_0001_000005
-     */
-    assertEquals(1, plan.getChildCount(rootEB.getId()));
-
-    ExecutionBlock firstEB = plan.getChild(rootEB.getId(), 0);
-
-    assertNotNull(firstEB);
-    assertEquals(2, firstEB.getBroadcastTables().size());
-    assertTrue(firstEB.getBroadcastTables().contains("default.supplier"));
-    assertTrue(firstEB.getBroadcastTables().contains("default.part"));
-  }
-
-  @Test
-  public final void testBroadcastTwoPartJoin() throws Exception {
-    ResultSet res = executeQuery();
-    assertEquals(FetchResultSet.class, res.getClass());
-    FetchResultSet resultSet = (FetchResultSet)res;
-
-    assertResultSet(res);
-    cleanupQuery(res);
-
-    MasterPlan plan = getQueryPlan(resultSet.getQueryId());
-    ExecutionBlock rootEB = plan.getRoot();
-
-    /*
-    |-eb_1395996354406_0001_000010
-       |-eb_1395996354406_0001_000009
-          |-eb_1395996354406_0001_000008
-          |-eb_1395996354406_0001_000005
-     */
-    assertEquals(1, plan.getChildCount(rootEB.getId()));
-
-    ExecutionBlock firstJoinEB = plan.getChild(rootEB.getId(), 0);
-    assertNotNull(firstJoinEB);
-    assertEquals(NodeType.JOIN, firstJoinEB.getPlan().getType());
-    assertEquals(0, firstJoinEB.getBroadcastTables().size());
-
-    ExecutionBlock leafEB1 = plan.getChild(firstJoinEB.getId(), 0);
-    assertTrue(leafEB1.getBroadcastTables().contains("default.orders"));
-    assertTrue(leafEB1.getBroadcastTables().contains("default.part"));
-
-    ExecutionBlock leafEB2 = plan.getChild(firstJoinEB.getId(), 1);
-    assertTrue(leafEB2.getBroadcastTables().contains("default.nation"));
-  }
-
-  @Test
-  public final void testBroadcastSubquery() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testBroadcastSubquery2() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testBroadcastPartitionTable() throws Exception {
-    // If all tables participate in the BROADCAST JOIN, there is some missing data.
-    executeDDL("customer_partition_ddl.sql", null);
-    ResultSet res = executeFile("insert_into_customer_partition.sql");
-    res.close();
-
-    createMultiFile("nation", 2, new TupleCreator() {
-      public Tuple createTuple(String[] columnDatas) {
-        return new VTuple(new Datum[]{
-            new Int4Datum(Integer.parseInt(columnDatas[0])),
-            new TextDatum(columnDatas[1]),
-            new Int4Datum(Integer.parseInt(columnDatas[2])),
-            new TextDatum(columnDatas[3])
-        });
-      }
-    });
-
-    createMultiFile("orders", 1, new TupleCreator() {
-      public Tuple createTuple(String[] columnDatas) {
-        return new VTuple(new Datum[]{
-            new Int4Datum(Integer.parseInt(columnDatas[0])),
-            new Int4Datum(Integer.parseInt(columnDatas[1])),
-            new TextDatum(columnDatas[2])
-        });
-      }
-    });
-
-    res = executeQuery();
-    assertResultSet(res);
-    res.close();
-
-    executeString("DROP TABLE customer_broad_parts PURGE");
-    executeString("DROP TABLE nation_multifile PURGE");
-    executeString("DROP TABLE orders_multifile PURGE");
-  }
-
-  @Test
-  public final void testBroadcastMultiColumnPartitionTable() throws Exception {
-    String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable");
-    ResultSet res = testBase.execute(
-        "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) ");
-    res.close();
-    TajoTestingCluster cluster = testBase.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
-    res = executeString("insert overwrite into " + tableName
-        + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders");
-    res.close();
-
-    res = executeString(
-        "select distinct a.col3 from " + tableName + " as a " +
-            "left outer join lineitem_large b " +
-            "on a.col1 = b.l_orderkey order by a.col3"
-    );
-
-    assertResultSet(res);
-    cleanupQuery(res);
-  }
-
-  @Test
-  public final void testCasebyCase1() throws Exception {
-    // Left outer join with a small table and a large partition table which not matched any partition path.
-    String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable");
-    testBase.execute(
-        "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" +
-            "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" +
-            "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" +
-            "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" +
-            "partition by column(l_orderkey int4) ").close();
-    TajoTestingCluster cluster = testBase.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
-    executeString("insert overwrite into " + tableName +
-        " select l_partkey, l_suppkey, l_linenumber, \n" +
-        " l_quantity, l_extendedprice, l_discount, l_tax, \n" +
-        " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" +
-        " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem_large");
-
-    ResultSet res = executeString(
-        "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " +
-            "left outer join " + tableName + " b " +
-            "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000"
-    );
-
-    String expected = "key1,key2\n" +
-        "-------------------------------\n" +
-        "1,null\n" +
-        "1,null\n" +
-        "2,null\n" +
-        "3,null\n" +
-        "3,null\n";
-
-    try {
-      assertEquals(expected, resultSetToString(res));
-    } finally {
-      cleanupQuery(res);
-    }
-  }
-
-  @Test
-  public final void testInnerAndOuterWithEmpty() throws Exception {
-    executeDDL("customer_partition_ddl.sql", null);
-    executeFile("insert_into_customer_partition.sql").close();
-
-    // outer join table is empty
-    ResultSet res = executeString(
-        "select a.l_orderkey, b.o_orderkey, c.c_custkey from lineitem a " +
-            "inner join orders b on a.l_orderkey = b.o_orderkey " +
-            "left outer join customer_broad_parts c on a.l_orderkey = c.c_custkey and c.c_custkey < 0"
-    );
-
-    String expected = "l_orderkey,o_orderkey,c_custkey\n" +
-        "-------------------------------\n" +
-        "1,1,null\n" +
-        "1,1,null\n" +
-        "2,2,null\n" +
-        "3,3,null\n" +
-        "3,3,null\n";
-
-    assertEquals(expected, resultSetToString(res));
-    res.close();
-
-    executeString("DROP TABLE customer_broad_parts PURGE").close();
-  }
-
-  static interface TupleCreator {
-    public Tuple createTuple(String[] columnDatas);
-  }
-
-  private void createMultiFile(String tableName, int numRowsEachFile, TupleCreator tupleCreator) throws Exception {
-    // make multiple small file
-    String multiTableName = tableName + "_multifile";
-    executeDDL(multiTableName + "_ddl.sql", null);
-
-    TableDesc table = client.getTableDesc(multiTableName);
-    assertNotNull(table);
-
-    TableMeta tableMeta = table.getMeta();
-    Schema schema = table.getLogicalSchema();
-
-    File file = new File("src/test/tpch/" + tableName + ".tbl");
-
-    if (!file.exists()) {
-      file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + tableName + ".tbl");
-    }
-    String[] rows = FileUtil.readTextFile(file).split("\n");
-
-    assertTrue(rows.length > 0);
-
-    int fileIndex = 0;
-
-    Appender appender = null;
-    for (int i = 0; i < rows.length; i++) {
-      if (i % numRowsEachFile == 0) {
-        if (appender != null) {
-          appender.flush();
-          appender.close();
-        }
-        Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
-        fileIndex++;
-        appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
-            .getAppender(tableMeta, schema, dataPath);
-        appender.init();
-      }
-      String[] columnDatas = rows[i].split("\\|");
-      Tuple tuple = tupleCreator.createTuple(columnDatas);
-      appender.addTuple(tuple);
-    }
-    appender.flush();
-    appender.close();
-  }
-
-  @Test
-  public final void testLeftOuterJoinLeftSideSmallTable() throws Exception {
-    KeyValueSet tableOptions = new KeyValueSet();
-    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
-    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
-    String[] data = new String[]{ "1000000|a", "1000001|b", "2|c", "3|d", "4|e" };
-    TajoTestingCluster.createTable("table1", schema, tableOptions, data, 1);
-
-    data = new String[10000];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable" + i;
-    }
-    TajoTestingCluster.createTable("table_large", schema, tableOptions, data, 2);
-
-    try {
-      ResultSet res = executeString(
-          "select a.id, b.name from table1 a left outer join table_large b on a.id = b.id order by a.id"
-      );
-
-      String expected = "id,name\n" +
-          "-------------------------------\n" +
-          "2,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable2\n" +
-          "3,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable3\n" +
-          "4,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable4\n" +
-          "1000000,null\n" +
-          "1000001,null\n";
-
-      assertEquals(expected, resultSetToString(res));
-
-      cleanupQuery(res);
-    } finally {
-      executeString("DROP TABLE table1 PURGE").close();
-      executeString("DROP TABLE table_large PURGE").close();
-    }
-  }
-
-
-  @Test
-  public final void testSelfJoin() throws Exception {
-    String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
-    ResultSet res = executeString(
-        "create table " + tableName + " (n_name text,"
-            + "  n_comment text, n_regionkey int8) USING csv "
-            + "WITH ('csvfile.delimiter'='|')"
-            + "PARTITION BY column(n_nationkey int8)");
-    res.close();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
-    res = executeString(
-        "insert overwrite into " + tableName
-            + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
-    res.close();
-
-    res = executeString(
-      "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
-      + " where a.n_nationkey in (1)");
-    String expected = resultSetToString(res);
-    res.close();
-
-    res = executeString(
-      "select a.n_nationkey, a.n_name from " + tableName + " a join "+tableName +
-      " b on a.n_nationkey = b.n_nationkey "
-      + " where a.n_nationkey in (1)");
-    String resultSetData = resultSetToString(res);
-    res.close();
-
-    assertEquals(expected, resultSetData);
-
-  }
-
-  @Test
-  public final void testSelfJoin2() throws Exception {
-    /*
-     https://issues.apache.org/jira/browse/TAJO-1102
-     See the following case.
-     CREATE TABLE orders_partition
-       (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,
-          o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')
-       PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT);
-
-     select a.o_orderstatus, count(*) as cnt
-      from orders_partition a
-      inner join orders_partition b
-        on a.o_orderdate = b.o_orderdate
-            and a.o_orderstatus = b.o_orderstatus
-            and a.o_orderkey = b.o_orderkey
-      where a.o_orderdate='1995-02-21'
-        and a.o_orderstatus in ('F')
-      group by a.o_orderstatus;
-
-      Because of the where condition[where a.o_orderdate='1995-02-21 and a.o_orderstatus in ('F')],
-        orders_partition table aliased a is small and broadcast target.
-    */
-    String tableName = CatalogUtil.normalizeIdentifier("partitioned_orders_large");
-    ResultSet res = executeString(
-        "create table " + tableName + " (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,\n" +
-            "o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')\n" +
-            "PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT, o_orderkey_mod INT8)");
-    res.close();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
-    res = executeString(
-        "insert overwrite into " + tableName +
-            " select o_orderkey, o_custkey, o_totalprice, " +
-            " o_orderpriority, o_clerk, o_shippriority, o_comment, o_orderdate, o_orderstatus, o_orderkey % 10 " +
-            " from orders_large ");
-    res.close();
-
-    res = executeString(
-        "select a.o_orderdate, a.o_orderstatus, a.o_orderkey % 10 as o_orderkey_mod, a.o_totalprice " +
-            "from orders_large a " +
-            "join orders_large b on a.o_orderkey = b.o_orderkey " +
-            "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey % 10 = 1" +
-            " order by a.o_orderkey"
-    );
-    String expected = resultSetToString(res);
-    res.close();
-
-    res = executeString(
-        "select a.o_orderdate, a.o_orderstatus, a.o_orderkey_mod, a.o_totalprice " +
-            "from " + tableName +
-            " a join "+ tableName + " b on a.o_orderkey = b.o_orderkey " +
-            "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey_mod = 1 " +
-            " order by a.o_orderkey"
-    );
-    String resultSetData = resultSetToString(res);
-    res.close();
-
-    assertEquals(expected, resultSetData);
-
-  }
-  @Test
-  public void testMultipleBroadcastDataFileWithZeroLength() throws Exception {
-    // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner.
-    // testMultipleBroadcastDataFileWithZeroLength testcase is for the leaf node
-    createMultiFile("nation", 2, new TupleCreator() {
-      public Tuple createTuple(String[] columnDatas) {
-        return new VTuple(new Datum[]{
-            new Int4Datum(Integer.parseInt(columnDatas[0])),
-            new TextDatum(columnDatas[1]),
-            new Int4Datum(Integer.parseInt(columnDatas[2])),
-            new TextDatum(columnDatas[3])
-        });
-      }
-    });
-    addEmptyDataFile("nation_multifile", false);
-
-    ResultSet res = executeQuery();
-
-    assertResultSet(res);
-    cleanupQuery(res);
-
-    executeString("DROP TABLE nation_multifile PURGE");
-  }
-
-  @Test
-  public void testMultipleBroadcastDataFileWithZeroLength2() throws Exception {
-    // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner.
-    // testMultipleBroadcastDataFileWithZeroLength2 testcase is for the non-leaf node
-    createMultiFile("nation", 2, new TupleCreator() {
-      public Tuple createTuple(String[] columnDatas) {
-        return new VTuple(new Datum[]{
-            new Int4Datum(Integer.parseInt(columnDatas[0])),
-            new TextDatum(columnDatas[1]),
-            new Int4Datum(Integer.parseInt(columnDatas[2])),
-            new TextDatum(columnDatas[3])
-        });
-      }
-    });
-    addEmptyDataFile("nation_multifile", false);
-
-    ResultSet res = executeQuery();
-
-    assertResultSet(res);
-    cleanupQuery(res);
-
-    executeString("DROP TABLE nation_multifile PURGE");
-  }
-
-  @Test
-  public void testMultiplePartitionedBroadcastDataFileWithZeroLength() throws Exception {
-    String tableName = CatalogUtil.normalizeIdentifier("nation_partitioned");
-    ResultSet res = testBase.execute(
-        "create table " + tableName + " (n_name text) partition by column(n_nationkey int4, n_regionkey int4) ");
-    res.close();
-    TajoTestingCluster cluster = testBase.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
-    res = executeString("insert overwrite into " + tableName
-        + " select n_name, n_nationkey, n_regionkey from nation");
-    res.close();
-
-    addEmptyDataFile("nation_partitioned", true);
-
-    res = executeQuery();
-
-    assertResultSet(res);
-    cleanupQuery(res);
-
-    executeString("DROP TABLE nation_partitioned PURGE");
-  }
-
-  @Test
-  public void testMultiplePartitionedBroadcastDataFileWithZeroLength2() throws Exception {
-    String tableName = CatalogUtil.normalizeIdentifier("nation_partitioned");
-    ResultSet res = testBase.execute(
-        "create table " + tableName + " (n_name text) partition by column(n_nationkey int4, n_regionkey int4) ");
-    res.close();
-    TajoTestingCluster cluster = testBase.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
-    res = executeString("insert overwrite into " + tableName
-        + " select n_name, n_nationkey, n_regionkey from nation");
-    res.close();
-
-    addEmptyDataFile("nation_partitioned", true);
-
-    res = executeQuery();
-
-    assertResultSet(res);
-    cleanupQuery(res);
-
-    executeString("DROP TABLE nation_partitioned PURGE");
-  }
-
-  private void addEmptyDataFile(String tableName, boolean isPartitioned) throws Exception {
-    TableDesc table = client.getTableDesc(tableName);
-
-    Path path = new Path(table.getPath());
-    FileSystem fs = path.getFileSystem(conf);
-    if (isPartitioned) {
-      List<Path> partitionPathList = getPartitionPathList(fs, path);
-      for (Path eachPath: partitionPathList) {
-        Path dataPath = new Path(eachPath, 0 + "_empty.csv");
-        OutputStream out = fs.create(dataPath);
-        out.close();
-      }
-    } else {
-      Path dataPath = new Path(path, 0 + "_empty.csv");
-      OutputStream out = fs.create(dataPath);
-      out.close();
-    }
-  }
-
-  private List<Path> getPartitionPathList(FileSystem fs, Path path) throws Exception {
-    FileStatus[] files = fs.listStatus(path);
-    List<Path> paths = new ArrayList<Path>();
-    if (files != null) {
-      for (FileStatus eachFile: files) {
-        if (eachFile.isFile()) {
-          paths.add(path);
-          return paths;
-        } else {
-          paths.addAll(getPartitionPathList(fs, eachFile.getPath()));
-        }
-      }
-    }
-
-    return paths;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4b1b7799/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
index 34ead13..da0f59d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -18,89 +18,322 @@
 
 package org.apache.tajo.engine.query;
 
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.sql.ResultSet;
 
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-public class TestJoinOnPartitionedTables extends QueryTestCaseBase {
+/*
+ * NOTE: Plan tests are disabled in TestJoinOnPartitionedTables.
+ * A plan reading partitioned table currently contains HDFS paths to input partitions.
+ * An example form of path to an input partition is hdfs://localhost:60305/tajo/warehouse/default/customer_parts/c_nationkey=1.
+ * Here, the different HDFS port is used for each test run, it is difficult to test query plans that read partitioned table.
+ */
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestJoinOnPartitionedTables extends TestJoinQuery {
+
+  public TestJoinOnPartitionedTables(String joinOption) throws Exception {
+    super(joinOption);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestJoinQuery.setup();
+    client.executeQuery("CREATE TABLE if not exists customer_parts " +
+        "(c_custkey INT4, c_name TEXT, c_address TEXT, c_phone TEXT, c_acctbal FLOAT8, c_mktsegment TEXT, c_comment TEXT) " +
+        "PARTITION BY COLUMN (c_nationkey INT4) as " +
+        "SELECT c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey FROM customer;");
+    client.executeQueryAndGetResult("create table if not exists nation_partitioned (n_name text) " +
+        "partition by column(n_nationkey int4, n_regionkey int4) " +
+        "as select n_name, n_nationkey, n_regionkey from nation");
+    addEmptyDataFile("nation_partitioned", true);
+  }
 
-  public TestJoinOnPartitionedTables() {
-    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  @AfterClass
+  public static void classTearDown() throws ServiceException {
+    TestJoinQuery.classTearDown();
+    client.executeQuery("DROP TABLE IF EXISTS customer_parts PURGE");
+    client.executeQuery("DROP TABLE IF EXISTS nation_partitioned PURGE");
   }
 
   @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
   public void testPartitionTableJoinSmallTable() throws Exception {
-    executeDDL("customer_ddl.sql", null);
-    ResultSet res = executeFile("insert_into_customer.sql");
-    res.close();
+    runSimpleTests();
+  }
 
-    res = executeQuery();
-    assertResultSet(res);
-    res.close();
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testNoProjectionJoinQual() throws Exception {
+    runSimpleTests();
+  }
 
-    res = executeFile("selfJoinOfPartitionedTable.sql");
-    assertResultSet(res, "selfJoinOfPartitionedTable.result");
-    res.close();
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testPartialFilterPushDown() throws Exception {
+    runSimpleTests();
+  }
 
-    res = executeFile("testNoProjectionJoinQual.sql");
-    assertResultSet(res, "testNoProjectionJoinQual.result");
-    res.close();
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testPartialFilterPushDownOuterJoin() throws Exception {
+    runSimpleTests();
+  }
 
-    res = executeFile("testPartialFilterPushDown.sql");
-    assertResultSet(res, "testPartialFilterPushDown.result");
-    res.close();
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testPartialFilterPushDownOuterJoin2() throws Exception {
+    runSimpleTests();
+  }
 
-    res = executeFile("testPartialFilterPushDownOuterJoin.sql");
-    assertResultSet(res, "testPartialFilterPushDownOuterJoin.result");
-    res.close();
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void selfJoinOfPartitionedTable() throws Exception {
+    runSimpleTests();
+  }
 
-    res = executeFile("testPartialFilterPushDownOuterJoin2.sql");
-    assertResultSet(res, "testPartialFilterPushDownOuterJoin2.result");
-    res.close();
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest(queries = {
+      @QuerySpec("select a.c_custkey, b.c_custkey from " +
+          "  (select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+          "   union all " +
+          "   select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+          ") a " +
+          "left outer join customer_parts b " +
+          "on a.c_custkey = b.c_custkey " +
+          "and a.c_nationkey > 0")
+  })
+  public void testPartitionMultiplePartitionFilter() throws Exception {
+    runSimpleTests();
+  }
 
-    executeString("DROP TABLE customer_parts PURGE").close();
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testFilterPushDownPartitionColumnCaseWhen() throws Exception {
+    runSimpleTests();
   }
 
   @Test
-  public void testPartitionMultiplePartitionFilter() throws Exception {
-    executeDDL("customer_ddl.sql", null);
-    ResultSet res = executeFile("insert_into_customer.sql");
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public void testMultiplePartitionedBroadcastDataFileWithZeroLength() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public void testMultiplePartitionedBroadcastDataFileWithZeroLength2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  public final void testCasebyCase1() throws Exception {
+    // Left outer join with a small table and a large partition table which not matched any partition path.
+    String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable");
+    executeString(
+        "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" +
+            "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" +
+            "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" +
+            "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" +
+            "partition by column(l_orderkey int4) ").close();
+
+    try {
+      executeString("insert overwrite into " + tableName +
+          " select l_partkey, l_suppkey, l_linenumber, \n" +
+          " l_quantity, l_extendedprice, l_discount, l_tax, \n" +
+          " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" +
+          " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem");
+
+      ResultSet res = executeString(
+          "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " +
+              "left outer join " + tableName + " b " +
+              "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000"
+      );
+
+      String expected = "key1,key2\n" +
+          "-------------------------------\n" +
+          "1,null\n" +
+          "1,null\n" +
+          "2,null\n" +
+          "3,null\n" +
+          "3,null\n";
+      assertEquals(expected, resultSetToString(res));
+      cleanupQuery(res);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
+  }
+
+  // TODO: This test should be reverted after resolving TAJO-1600
+//  @Test
+  public final void testBroadcastMultiColumnPartitionTable() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable");
+    ResultSet res = testBase.execute(
+        "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) ");
     res.close();
+    TajoTestingCluster cluster = testBase.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    try {
+      res = executeString("insert overwrite into " + tableName
+          + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders");
+      res.close();
+
+      res = executeString(
+          "select distinct a.col3 from " + tableName + " as a " +
+              "left outer join lineitem b " +
+              "on a.col1 = b.l_orderkey order by a.col3"
+      );
+
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
+  }
 
-    res = executeString(
-        "select a.c_custkey, b.c_custkey from " +
-            "  (select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
-            "   union all " +
-            "   select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
-            ") a " +
-            "left outer join customer_parts b " +
-            "on a.c_custkey = b.c_custkey " +
-            "and a.c_nationkey > 0"
-    );
-
-    String expected =
-        "c_custkey,c_custkey\n" +
-            "-------------------------------\n";
-    assertEquals(expected, resultSetToString(res));
+  @Test
+  public final void testSelfJoin() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
+    ResultSet res = executeString(
+        "create table " + tableName + " (n_name text,"
+            + "  n_comment text, n_regionkey int8) USING csv "
+            + "WITH ('csvfile.delimiter'='|')"
+            + "PARTITION BY column(n_nationkey int8)");
     res.close();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    try {
+      res = executeString(
+          "insert overwrite into " + tableName
+              + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
+      res.close();
+
+      res = executeString(
+          "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
+              + " where a.n_nationkey in (1)");
+      String expected = resultSetToString(res);
+      res.close();
 
-    executeString("DROP TABLE customer_parts PURGE").close();
+      res = executeString(
+          "select a.n_nationkey, a.n_name from " + tableName + " a join " + tableName +
+              " b on a.n_nationkey = b.n_nationkey "
+              + " where a.n_nationkey in (1)");
+      String resultSetData = resultSetToString(res);
+      res.close();
+
+      assertEquals(expected, resultSetData);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
   }
 
   @Test
-  public void testFilterPushDownPartitionColumnCaseWhen() throws Exception {
-    executeDDL("customer_ddl.sql", null);
-    ResultSet res = executeFile("insert_into_customer.sql");
+  public final void testSelfJoin2() throws Exception {
+    /*
+     https://issues.apache.org/jira/browse/TAJO-1102
+     See the following case.
+     CREATE TABLE orders_partition
+       (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,
+          o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')
+       PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT);
+
+     select a.o_orderstatus, count(*) as cnt
+      from orders_partition a
+      inner join orders_partition b
+        on a.o_orderdate = b.o_orderdate
+            and a.o_orderstatus = b.o_orderstatus
+            and a.o_orderkey = b.o_orderkey
+      where a.o_orderdate='1995-02-21'
+        and a.o_orderstatus in ('F')
+      group by a.o_orderstatus;
+
+      Because of the where condition[where a.o_orderdate='1995-02-21 and a.o_orderstatus in ('F')],
+        orders_partition table aliased a is small and broadcast target.
+    */
+    String tableName = CatalogUtil.normalizeIdentifier("partitioned_orders");
+    ResultSet res = executeString(
+        "create table " + tableName + " (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,\n" +
+            "o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')\n" +
+            "PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT, o_orderkey_mod INT8)");
     res.close();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    try {
+      res = executeString(
+          "insert overwrite into " + tableName +
+              " select o_orderkey, o_custkey, o_totalprice, " +
+              " o_orderpriority, o_clerk, o_shippriority, o_comment, o_orderdate, o_orderstatus, o_orderkey % 10 " +
+              " from orders ");
+      res.close();
+
+      res = executeString(
+          "select a.o_orderdate, a.o_orderstatus, a.o_orderkey % 10 as o_orderkey_mod, a.o_totalprice " +
+              "from orders a " +
+              "join orders b on a.o_orderkey = b.o_orderkey " +
+              "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey % 10 = 1" +
+              " order by a.o_orderkey"
+      );
+      String expected = resultSetToString(res);
+      res.close();
+
+      res = executeString(
+          "select a.o_orderdate, a.o_orderstatus, a.o_orderkey_mod, a.o_totalprice " +
+              "from " + tableName +
+              " a join " + tableName + " b on a.o_orderkey = b.o_orderkey " +
+              "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey_mod = 1 " +
+              " order by a.o_orderkey"
+      );
+      String resultSetData = resultSetToString(res);
+      res.close();
 
-    res = executeQuery();
-    assertResultSet(res);
+      cleanupQuery(res);
+      assertEquals(expected, resultSetData);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public final void testBroadcastPartitionTable() throws Exception {
+    // If all tables participate in the BROADCAST JOIN, there is some missing data.
+    executeDDL("customer_partition_ddl.sql", null);
+    ResultSet res = executeFile("insert_into_customer_partition.sql");
     res.close();
 
-    executeString("DROP TABLE customer_parts PURGE").close();
+    try {
+      runSimpleTests();
+    } finally {
+      executeString("DROP TABLE customer_broad_parts PURGE");
+    }
   }
 }