You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/14 16:30:13 UTC

[34/51] [partial] tajo git commit: TAJO-1761: Separate an integration unit test kit into an independent module.

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
new file mode 100644
index 0000000..d0088a5
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -0,0 +1,882 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.sql.ResultSet;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestInsertQuery extends QueryTestCaseBase {
+
+  @Test
+  public final void testInsertOverwrite() throws Exception {
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+    res = executeFile("testInsertOverwrite.sql");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertInto() throws Exception {
+    // create table and upload test data
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+    res = executeFile("testInsertOverwrite.sql");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeFile("testInsertInto.sql");
+    res.close();
+
+    List<Path> dataFiles = listTableFiles("table1");
+    assertEquals(2, dataFiles.size());
+
+    for (int i = 0; i < dataFiles.size(); i++) {
+      String name = dataFiles.get(i).getName();
+      assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+      String[] tokens = name.split("-");
+      assertEquals(4, tokens.length);
+      assertEquals(i, Integer.parseInt(tokens[3]));
+    }
+
+    String tableDatas = getTableFileContents("table1");
+
+    String expected = "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n" +
+        "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n";
+
+    assertNotNull(tableDatas);
+    assertEquals(expected, tableDatas);
+
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertIntoLocation() throws Exception {
+    Path dfsPath = new Path("/tajo-data/testInsertIntoLocation");
+    assertTestInsertIntoLocation(dfsPath);
+  }
+
+  @Test
+  public final void testInsertIntoLocationDifferentFSs() throws Exception {
+    Path localPath = CommonTestingUtil.getTestDir();
+    assertTestInsertIntoLocation(localPath);
+  }
+
+  public final void assertTestInsertIntoLocation(Path path) throws Exception {
+    FileSystem fs = null;
+
+    try {
+      executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close();
+
+      String resultFileData = getTableFileContents(path);
+      String expected = "1|1|1\n" +
+          "1|1|2\n" +
+          "2|2|1\n" +
+          "3|2|1\n" +
+          "3|3|2\n";
+
+      assertEquals(expected, resultFileData);
+
+      fs = path.getFileSystem(testingCluster.getConfiguration());
+
+      FileStatus[] files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(1, files.length);
+
+      for (FileStatus eachFileStatus : files) {
+        String name = eachFileStatus.getPath().getName();
+        assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+      }
+
+      executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close();
+      resultFileData = getTableFileContents(path);
+      expected = "1|1|1\n" +
+          "1|1|2\n" +
+          "2|2|1\n" +
+          "3|2|1\n" +
+          "3|3|2\n";
+
+      assertEquals(expected + expected, resultFileData);
+
+      files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(2, files.length);
+
+      for (FileStatus eachFileStatus : files) {
+        String name = eachFileStatus.getPath().getName();
+        assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+      }
+    } finally {
+      if (fs != null) {
+        fs.delete(path, true);
+      }
+    }
+  }
+
+  @Test
+  public final void testInsertIntoPartitionedTable() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoPartitionedTable");
+    executeString("create table " + tableName + " (n_name TEXT, n_regionkey INT4)" +
+        "USING csv PARTITION by column(n_nationkey INT4)" ).close();
+
+    try {
+      executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close();
+
+      ResultSet res = executeString("select * from " + tableName);
+
+      String expected = "n_name,n_regionkey,n_nationkey\n" +
+          "-------------------------------\n" +
+          "ALGERIA,0,0\n" +
+          "ARGENTINA,1,1\n" +
+          "IRAN,4,10\n" +
+          "IRAQ,4,11\n" +
+          "JAPAN,2,12\n" +
+          "JORDAN,4,13\n" +
+          "KENYA,0,14\n" +
+          "MOROCCO,0,15\n" +
+          "MOZAMBIQUE,0,16\n" +
+          "PERU,1,17\n" +
+          "CHINA,2,18\n" +
+          "ROMANIA,3,19\n" +
+          "BRAZIL,1,2\n" +
+          "SAUDI ARABIA,4,20\n" +
+          "VIETNAM,2,21\n" +
+          "RUSSIA,3,22\n" +
+          "UNITED KINGDOM,3,23\n" +
+          "UNITED STATES,1,24\n" +
+          "CANADA,1,3\n" +
+          "EGYPT,4,4\n" +
+          "ETHIOPIA,0,5\n" +
+          "FRANCE,3,6\n" +
+          "GERMANY,3,7\n" +
+          "INDIA,2,8\n" +
+          "INDONESIA,2,9\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+      executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close();
+      res = executeString("select * from " + tableName);
+      expected = "n_name,n_regionkey,n_nationkey\n" +
+          "-------------------------------\n" +
+          "ALGERIA,0,0\n" +
+          "ALGERIA,0,0\n" +
+          "ARGENTINA,1,1\n" +
+          "ARGENTINA,1,1\n" +
+          "IRAN,4,10\n" +
+          "IRAN,4,10\n" +
+          "IRAQ,4,11\n" +
+          "IRAQ,4,11\n" +
+          "JAPAN,2,12\n" +
+          "JAPAN,2,12\n" +
+          "JORDAN,4,13\n" +
+          "JORDAN,4,13\n" +
+          "KENYA,0,14\n" +
+          "KENYA,0,14\n" +
+          "MOROCCO,0,15\n" +
+          "MOROCCO,0,15\n" +
+          "MOZAMBIQUE,0,16\n" +
+          "MOZAMBIQUE,0,16\n" +
+          "PERU,1,17\n" +
+          "PERU,1,17\n" +
+          "CHINA,2,18\n" +
+          "CHINA,2,18\n" +
+          "ROMANIA,3,19\n" +
+          "ROMANIA,3,19\n" +
+          "BRAZIL,1,2\n" +
+          "BRAZIL,1,2\n" +
+          "SAUDI ARABIA,4,20\n" +
+          "SAUDI ARABIA,4,20\n" +
+          "VIETNAM,2,21\n" +
+          "VIETNAM,2,21\n" +
+          "RUSSIA,3,22\n" +
+          "RUSSIA,3,22\n" +
+          "UNITED KINGDOM,3,23\n" +
+          "UNITED KINGDOM,3,23\n" +
+          "UNITED STATES,1,24\n" +
+          "UNITED STATES,1,24\n" +
+          "CANADA,1,3\n" +
+          "CANADA,1,3\n" +
+          "EGYPT,4,4\n" +
+          "EGYPT,4,4\n" +
+          "ETHIOPIA,0,5\n" +
+          "ETHIOPIA,0,5\n" +
+          "FRANCE,3,6\n" +
+          "FRANCE,3,6\n" +
+          "GERMANY,3,7\n" +
+          "GERMANY,3,7\n" +
+          "INDIA,2,8\n" +
+          "INDIA,2,8\n" +
+          "INDONESIA,2,9\n" +
+          "INDONESIA,2,9\n";
+
+      assertEquals(expected, resultSetToString(res));
+
+      TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName);
+      assertNotNull(tableDesc);
+
+      Path path = new Path(tableDesc.getUri());
+      FileSystem fs = path.getFileSystem(testingCluster.getConfiguration());
+
+      FileStatus[] files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(25, files.length);
+
+      for (FileStatus eachFileStatus: files) {
+        assertTrue(eachFileStatus.getPath().getName().indexOf("n_nationkey=") == 0);
+        FileStatus[] dataFiles = fs.listStatus(eachFileStatus.getPath());
+        assertEquals(2, dataFiles.length);
+        for (FileStatus eachDataFileStatus: dataFiles) {
+          String name = eachDataFileStatus.getPath().getName();
+          assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+        }
+      }
+    } finally {
+      executeString("DROP TABLE " + tableName + " PURGE");
+    }
+  }
+
+  @Test
+  public final void testInsertOverwriteSmallerColumns() throws Exception {
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+    TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+
+    res = executeFile("testInsertOverwriteSmallerColumns.sql");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+    assertEquals(originalDesc.getSchema(), desc.getSchema());
+
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteWithTargetColumns() throws Exception {
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+    TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+
+    res = executeFile("testInsertOverwriteWithTargetColumns.sql");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + CatalogUtil.denormalizeIdentifier(getCurrentDatabase()) + ".table1");
+
+    assertTrue(res.next());
+    assertEquals(1, res.getLong(1));
+    assertTrue(0f == res.getFloat(2));
+    assertTrue(res.wasNull());
+    assertTrue(17.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertEquals(1, res.getLong(1));
+    assertTrue(0f == res.getFloat(2));
+    assertTrue(res.wasNull());
+    assertTrue(36.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertEquals(2, res.getLong(1));
+    assertTrue(0f == res.getFloat(2));
+    assertTrue(res.wasNull());
+    assertTrue(38.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertTrue(0f == res.getFloat(2));
+    assertTrue(res.wasNull());
+    assertTrue(45.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertEquals(3, res.getLong(1));
+    assertTrue(0f == res.getFloat(2));
+    assertTrue(res.wasNull());
+    assertTrue(49.0 == res.getFloat(3));
+
+    assertFalse(res.next());
+    res.close();
+
+    assertEquals(originalDesc.getSchema(), desc.getSchema());
+
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteWithAsterisk() throws Exception {
+    ResultSet res = executeFile("full_table_csv_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_csv"));
+
+    res = executeString("insert overwrite into full_table_csv select * from default.lineitem where l_orderkey = 3");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_csv");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(2, desc.getStats().getNumRows().intValue());
+    }
+    executeString("DROP TABLE full_table_csv PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteWithAsteriskAndMore() throws Exception {
+    ResultSet res = executeFile("lineitem_year_month_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "lineitem_year_month"));
+
+    res = executeFile("load_to_lineitem_year_month.sql");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "lineitem_year_month");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeQuery();
+    assertResultSet(res);
+    res.close();
+
+    executeString("DROP TABLE lineitem_year_month PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteIntoSelect() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("insertoverwriteintoselect");
+    ResultSet res = executeString("create table " + tableName + " as select l_orderkey from default.lineitem");
+    assertFalse(res.next());
+    res.close();
+
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+    TableDesc orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, orderKeys.getStats().getNumRows().intValue());
+    }
+
+    // this query will result in the two rows.
+    res = executeString("insert overwrite into " + tableName + " select l_orderkey from default.lineitem where l_orderkey = 3");
+    assertFalse(res.next());
+    res.close();
+
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+    orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(2, orderKeys.getStats().getNumRows().intValue());
+    }
+    executeString("DROP TABLE " + tableName + " PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteCapitalTableName() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testInsertOverwriteCapitalTableName");
+    ResultSet res = executeString("create table " + tableName + " as select * from default.lineitem");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+    res = executeString("insert overwrite into " + tableName + " select * from default.lineitem where l_orderkey = 3");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(2, desc.getStats().getNumRows().intValue());
+    }
+    executeString("DROP TABLE " + tableName + " PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteLocation() throws Exception {
+    ResultSet res = executeQuery();
+    res.close();
+    FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+    assertTrue(fs.exists(new Path("/tajo-data/testInsertOverwriteCapitalTableName")));
+    assertEquals(1, fs.listStatus(new Path("/tajo-data/testInsertOverwriteCapitalTableName")).length);
+  }
+
+  @Test
+  public final void testInsertOverwriteWithCompression() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testInsertOverwriteWithCompression");
+    ResultSet res = executeFile("testInsertOverwriteWithCompression_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+    res = executeQuery();
+    res.close();
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(2, desc.getStats().getNumRows().intValue());
+    }
+
+    FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+    assertTrue(fs.exists(new Path(desc.getUri())));
+    CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+
+    for (FileStatus file : fs.listStatus(new Path(desc.getUri()))) {
+      CompressionCodec codec = factory.getCodec(file.getPath());
+      assertTrue(codec instanceof DeflateCodec);
+    }
+    executeString("DROP TABLE " + tableName + " PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteLocationWithCompression() throws Exception {
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      ResultSet res = executeQuery();
+      res.close();
+      FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+      Path path = new Path("/tajo-data/testInsertOverwriteLocationWithCompression");
+      assertTrue(fs.exists(path));
+      assertEquals(1, fs.listStatus(path).length);
+
+      CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+      for (FileStatus file : fs.listStatus(path)){
+        CompressionCodec codec = factory.getCodec(file.getPath());
+        assertTrue(codec instanceof DeflateCodec);
+      }
+    }
+  }
+
+  @Test
+  public final void testInsertOverwriteWithAsteriskUsingParquet() throws Exception {
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      ResultSet res = executeFile("full_table_parquet_ddl.sql");
+      res.close();
+
+      CatalogService catalog = testingCluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_parquet"));
+
+      res = executeString(
+          "insert overwrite into full_table_parquet select * from default.lineitem where l_orderkey = 3");
+      res.close();
+      TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_parquet");
+      if (!testingCluster.isHiveCatalogStoreRunning()) {
+        assertEquals(2, desc.getStats().getNumRows().intValue());
+      }
+
+      res = executeString("select * from full_table_parquet;");
+      assertResultSet(res);
+
+      res = executeString("select l_orderkey, l_partkey from full_table_parquet;");
+      assertResultSet(res, "testInsertOverwriteWithAsteriskUsingParquet2.result");
+
+      executeString("DROP TABLE full_table_parquet PURGE");
+    }
+  }
+
+  @Test
+  public final void testInsertOverwriteIntoParquet() throws Exception {
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      executeString("create table parquet_table " +
+          "(l_orderkey int4, l_shipdate text, l_shipdate_function text) using parquet").close();
+
+      CatalogService catalog = testingCluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table"));
+
+      executeString(
+          "insert overwrite into parquet_table  " +
+              "select l_orderkey, l_shipdate, substr(l_shipdate, 1, 10) from default.lineitem").close();
+
+      TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table");
+      if (!testingCluster.isHiveCatalogStoreRunning()) {
+        assertEquals(5, desc.getStats().getNumRows().intValue());
+      }
+
+      ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " +
+          "from parquet_table ");
+
+      String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" +
+          "-------------------------------\n" +
+          "1,1996-03-13,1996-03-13\n" +
+          "1,1996-04-12,1996-04-12\n" +
+          "2,1997-01-28,1997-01-28\n" +
+          "3,1994-02-02,1994-02-02\n" +
+          "3,1993-11-09,1993-11-09\n";
+
+      assertEquals(expected, resultSetToString(res));
+
+      executeString("DROP TABLE parquet_table PURGE");
+    }
+  }
+
+  @Test
+  public final void testInsertOverwriteIntoPartitionedParquet() throws Exception {
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      executeString("create table parquet_table " +
+          "(l_orderkey int4, l_shipdate_function text) using parquet partition by column (l_shipdate text)").close();
+
+      CatalogService catalog = testingCluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table"));
+
+      executeString(
+          "insert overwrite into parquet_table  " +
+              "select l_orderkey, substr(l_shipdate, 1, 10), l_shipdate from default.lineitem").close();
+
+      TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table");
+      if (!testingCluster.isHiveCatalogStoreRunning()) {
+        assertEquals(5, desc.getStats().getNumRows().intValue());
+      }
+
+      ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " +
+          "from parquet_table ");
+
+      String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" +
+          "-------------------------------\n" +
+          "3,1993-11-09,1993-11-09\n" +
+          "3,1994-02-02,1994-02-02\n" +
+          "1,1996-03-13,1996-03-13\n" +
+          "1,1996-04-12,1996-04-12\n" +
+          "2,1997-01-28,1997-01-28\n";
+
+      assertEquals(expected, resultSetToString(res));
+
+      executeString("DROP TABLE parquet_table PURGE");
+    }
+  }
+
+  @Test
+  public final void testInsertOverwriteWithDatabase() throws Exception {
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+    res = executeQuery();
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteTableWithNonFromQuery() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+    ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+    res.close();
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+    res = executeString("insert overwrite into " + tableName
+        + " select 1::INT4, 2.1::FLOAT4, 'test'; ");
+
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(1, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + ";");
+    assertTrue(res.next());
+
+    assertEquals(3, res.getMetaData().getColumnCount());
+    assertEquals(1, res.getInt(1));
+    assertEquals(2.1f, res.getFloat(2), 10);
+    assertEquals("test", res.getString(3));
+
+    res.close();
+    executeString("DROP TABLE " + tableName + " PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery2");
+    ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+    res.close();
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+    res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(1, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + ";");
+    assertTrue(res.next());
+
+    assertEquals(3, res.getMetaData().getColumnCount());
+    assertEquals(1, res.getInt(1));
+    assertNull(res.getString(2));
+    assertEquals(0.0, res.getDouble(2), 10);
+    assertEquals("test", res.getString(3));
+
+    res.close();
+    executeString("DROP TABLE " + tableName + " PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwritePathWithNonFromQuery() throws Exception {
+    ResultSet res = executeString("insert overwrite into location " +
+        "'/tajo-data/testInsertOverwritePathWithNonFromQuery' " +
+        "USING csv WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+        "select 1::INT4, 2.1::FLOAT4, 'test'");
+
+    res.close();
+    FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+    Path path = new Path("/tajo-data/testInsertOverwritePathWithNonFromQuery");
+    assertTrue(fs.exists(path));
+    assertEquals(1, fs.listStatus(path).length);
+
+    CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+    FileStatus file = fs.listStatus(path)[0];
+    CompressionCodec codec = factory.getCodec(file.getPath());
+    assertTrue(codec instanceof DeflateCodec);
+
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(codec.createInputStream(fs.open(file.getPath()))));
+
+    try {
+      String line = reader.readLine();
+      assertNotNull(line);
+
+      String[] tokens = line.split("\\|");
+
+      assertEquals(3, tokens.length);
+      assertEquals("1", tokens[0]);
+      assertEquals("2.1", tokens[1]);
+      assertEquals("test", tokens[2]);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public final void testInsertOverwriteWithUnion() throws Exception {
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+    res = executeFile("testInsertOverwriteWithUnion.sql");
+    res.close();
+
+    String tableDatas = getTableFileContents("table1");
+
+    String expected = "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n" +
+        "1|3|173665.47\n" +
+        "2|4|46929.18\n" +
+        "3|2|193846.25\n";
+
+    assertNotNull(tableDatas);
+    assertEquals(expected, tableDatas);
+
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteWithUnionDifferentAlias() throws Exception {
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+    res = executeFile("testInsertOverwriteWithUnionDifferentAlias.sql");
+    res.close();
+
+    String tableDatas = getTableFileContents("table1");
+
+    String expected = "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n" +
+        "1|3|173665.47\n" +
+        "2|4|46929.18\n" +
+        "3|2|193846.25\n";
+
+    assertNotNull(tableDatas);
+    assertEquals(expected, tableDatas);
+
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertOverwriteLocationWithUnion() throws Exception {
+    ResultSet res = executeFile("testInsertOverwriteLocationWithUnion.sql");
+    res.close();
+
+    String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnion"));
+
+    String expected = "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n" +
+        "1|3|173665.47\n" +
+        "2|4|46929.18\n" +
+        "3|2|193846.25\n";
+
+    assertNotNull(resultDatas);
+    assertEquals(expected, resultDatas);
+  }
+
+  @Test
+  public final void testInsertOverwriteLocationWithUnionDifferenceAlias() throws Exception {
+    ResultSet res = executeFile("testInsertOverwriteLocationWithUnionDifferenceAlias.sql");
+    res.close();
+
+    String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnionDifferenceAlias"));
+
+    String expected = "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n" +
+        "1|3|173665.47\n" +
+        "2|4|46929.18\n" +
+        "3|2|193846.25\n";
+
+    assertNotNull(resultDatas);
+    assertEquals(expected, resultDatas);
+  }
+
+  @Test
+  public final void testInsertWithDifferentColumnOrder() throws Exception {
+    ResultSet res = executeFile("nation_diff_col_order.ddl");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "nation_diff"));
+
+    try {
+      res = executeFile("testInsertWithDifferentColumnOrder.sql");
+      res.close();
+
+      res = executeString("select * from nation_diff");
+      assertResultSet(res);
+    } finally {
+      executeString("drop table nation_diff purge;");
+    }
+  }
+
+  @Test
+  public final void testFixedCharSelectWithNoLength() throws Exception {
+    ResultSet res = executeFile("test1_nolength_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "test1"));
+
+    res = executeFile("testInsertIntoSelectWithFixedSizeCharWithNoLength.sql");
+    res.close();
+
+    //remove \0
+    String resultDatas = getTableFileContents("test1").replaceAll("\0","");
+    String expected = "a\n";
+
+    assertNotNull(resultDatas);
+    assertEquals(expected.length(), resultDatas.length());
+    assertEquals(expected, resultDatas);
+    executeString("DROP TABLE test1 PURGE");
+  }
+
+  @Test
+  public final void testFixedCharSelect() throws Exception {
+    ResultSet res = executeFile("test1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "test1"));
+
+    res = executeFile("testInsertIntoSelectWithFixedSizeChar.sql");
+    res.close();
+
+    //remove \0
+    String resultDatas = getTableFileContents("test1").replaceAll("\0","");
+    String expected = "a\n" +
+      "abc\n" +
+      "abcde\n";
+
+    assertNotNull(resultDatas);
+    assertEquals(expected.length(), resultDatas.length());
+    assertEquals(expected, resultDatas);
+    executeString("DROP TABLE test1 PURGE");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
new file mode 100644
index 0000000..d2585a7
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * NOTE: Plan tests are disabled in TestJoinOnPartitionedTables.
+ * A plan reading partitioned table currently contains HDFS paths to input partitions.
+ * An example form of path to an input partition is hdfs://localhost:60305/tajo/warehouse/default/customer_parts/c_nationkey=1.
+ * Here, the different HDFS port is used for each test run, it is difficult to test query plans that read partitioned table.
+ */
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+@NotThreadSafe
+public class TestJoinOnPartitionedTables extends TestJoinQuery {
+
+  public TestJoinOnPartitionedTables(String joinOption) throws Exception {
+    super(joinOption);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestJoinQuery.setup();
+    client.executeQuery("CREATE TABLE if not exists customer_parts " +
+        "(c_custkey INT4, c_name TEXT, c_address TEXT, c_phone TEXT, c_acctbal FLOAT8, c_mktsegment TEXT, c_comment TEXT) " +
+        "PARTITION BY COLUMN (c_nationkey INT4) as " +
+        "SELECT c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey FROM customer;");
+    client.executeQueryAndGetResult("create table if not exists nation_partitioned (n_name text) " +
+        "partition by column(n_nationkey int4, n_regionkey int4) " +
+        "as select n_name, n_nationkey, n_regionkey from nation");
+    addEmptyDataFile("nation_partitioned", true);
+  }
+
+  @AfterClass
+  public static void classTearDown() throws SQLException {
+    TestJoinQuery.classTearDown();
+    client.executeQuery("DROP TABLE IF EXISTS customer_parts PURGE");
+    client.executeQuery("DROP TABLE IF EXISTS nation_partitioned PURGE");
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testPartitionTableJoinSmallTable() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testNoProjectionJoinQual() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testPartialFilterPushDown() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testPartialFilterPushDownOuterJoin() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testPartialFilterPushDownOuterJoin2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void selfJoinOfPartitionedTable() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest(queries = {
+      @QuerySpec("select a.c_custkey, b.c_custkey from " +
+          "  (select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+          "   union all " +
+          "   select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+          ") a " +
+          "left outer join customer_parts b " +
+          "on a.c_custkey = b.c_custkey " +
+          "and a.c_nationkey > 0")
+  })
+  public void testPartitionMultiplePartitionFilter() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public void testFilterPushDownPartitionColumnCaseWhen() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public void testMultiplePartitionedBroadcastDataFileWithZeroLength() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public void testMultiplePartitionedBroadcastDataFileWithZeroLength2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  public final void testCasebyCase1() throws Exception {
+    // Left outer join with a small table and a large partition table which not matched any partition path.
+    String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable");
+    executeString(
+        "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" +
+            "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" +
+            "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" +
+            "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" +
+            "partition by column(l_orderkey int4) ").close();
+
+    try {
+      executeString("insert overwrite into " + tableName +
+          " select l_partkey, l_suppkey, l_linenumber, \n" +
+          " l_quantity, l_extendedprice, l_discount, l_tax, \n" +
+          " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" +
+          " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem");
+
+      ResultSet res = executeString(
+          "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " +
+              "left outer join " + tableName + " b " +
+              "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000"
+      );
+
+      String expected = "key1,key2\n" +
+          "-------------------------------\n" +
+          "1,null\n" +
+          "1,null\n" +
+          "2,null\n" +
+          "3,null\n" +
+          "3,null\n";
+      assertEquals(expected, resultSetToString(res));
+      cleanupQuery(res);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
+  }
+
+  // TODO: This test should be reverted after resolving TAJO-1600
+//  @Test
+  public final void testBroadcastMultiColumnPartitionTable() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable");
+    ResultSet res = testBase.execute(
+        "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) ");
+    res.close();
+    TajoTestingCluster cluster = testBase.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    try {
+      res = executeString("insert overwrite into " + tableName
+          + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders");
+      res.close();
+
+      res = executeString(
+          "select distinct a.col3 from " + tableName + " as a " +
+              "left outer join lineitem b " +
+              "on a.col1 = b.l_orderkey order by a.col3"
+      );
+
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
+  }
+
+  @Test
+  public final void testSelfJoin() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
+    ResultSet res = executeString(
+        "create table " + tableName + " (n_name text,"
+            + "  n_comment text, n_regionkey int8) USING text "
+            + "WITH ('text.delimiter'='|')"
+            + "PARTITION BY column(n_nationkey int8)");
+    res.close();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    try {
+      res = executeString(
+          "insert overwrite into " + tableName
+              + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
+      res.close();
+
+      res = executeString(
+          "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
+              + " where a.n_nationkey in (1)");
+      String expected = resultSetToString(res);
+      res.close();
+
+      res = executeString(
+          "select a.n_nationkey, a.n_name from " + tableName + " a join " + tableName +
+              " b on a.n_nationkey = b.n_nationkey "
+              + " where a.n_nationkey in (1)");
+      String resultSetData = resultSetToString(res);
+      res.close();
+
+      assertEquals(expected, resultSetData);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
+  }
+
+  @Test
+  public final void testSelfJoin2() throws Exception {
+    /*
+     https://issues.apache.org/jira/browse/TAJO-1102
+     See the following case.
+     CREATE TABLE orders_partition
+       (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,
+          o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING TEXT WITH ('text.delimiter'='|')
+       PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT);
+
+     select a.o_orderstatus, count(*) as cnt
+      from orders_partition a
+      inner join orders_partition b
+        on a.o_orderdate = b.o_orderdate
+            and a.o_orderstatus = b.o_orderstatus
+            and a.o_orderkey = b.o_orderkey
+      where a.o_orderdate='1995-02-21'
+        and a.o_orderstatus in ('F')
+      group by a.o_orderstatus;
+
+      Because of the where condition[where a.o_orderdate='1995-02-21 and a.o_orderstatus in ('F')],
+        orders_partition table aliased a is small and broadcast target.
+    */
+    String tableName = CatalogUtil.normalizeIdentifier("partitioned_orders");
+    ResultSet res = executeString(
+        "create table " + tableName + " (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,\n" +
+            "o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING TEXT WITH ('text.delimiter'='|')\n" +
+            "PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT, o_orderkey_mod INT8)");
+    res.close();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    try {
+      res = executeString(
+          "insert overwrite into " + tableName +
+              " select o_orderkey, o_custkey, o_totalprice, " +
+              " o_orderpriority, o_clerk, o_shippriority, o_comment, o_orderdate, o_orderstatus, o_orderkey % 10 " +
+              " from orders ");
+      res.close();
+
+      res = executeString(
+          "select a.o_orderdate, a.o_orderstatus, a.o_orderkey % 10 as o_orderkey_mod, a.o_totalprice " +
+              "from orders a " +
+              "join orders b on a.o_orderkey = b.o_orderkey " +
+              "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey % 10 = 1" +
+              " order by a.o_orderkey"
+      );
+      String expected = resultSetToString(res);
+      res.close();
+
+      res = executeString(
+          "select a.o_orderdate, a.o_orderstatus, a.o_orderkey_mod, a.o_totalprice " +
+              "from " + tableName +
+              " a join " + tableName + " b on a.o_orderkey = b.o_orderkey " +
+              "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey_mod = 1 " +
+              " order by a.o_orderkey"
+      );
+      String resultSetData = resultSetToString(res);
+      res.close();
+
+      cleanupQuery(res);
+      assertEquals(expected, resultSetData);
+    } finally {
+      executeString("drop table " + tableName + " purge");
+    }
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+  @SimpleTest()
+  public final void testBroadcastPartitionTable() throws Exception {
+    // If all tables participate in the BROADCAST JOIN, there is some missing data.
+    executeDDL("customer_partition_ddl.sql", null);
+    ResultSet res = executeFile("insert_into_customer_partition.sql");
+    res.close();
+
+    try {
+      runSimpleTests();
+    } finally {
+      executeString("DROP TABLE customer_broad_parts PURGE");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
new file mode 100644
index 0000000..2fddbfa
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.Int4Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestJoinQuery extends QueryTestCaseBase {
+  private static final Log LOG = LogFactory.getLog(TestJoinQuery.class);
+  private static int reference = 0;
+
+  public TestJoinQuery(String joinOption) throws Exception {
+    super(TajoConstants.DEFAULT_DATABASE_NAME, joinOption);
+
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true");
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "" + (5 * 1024));
+
+    testingCluster.setAllTajoDaemonConfValue(
+        ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+        ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+        ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+        ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
+
+    if (joinOption.indexOf("NoBroadcast") >= 0) {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false");
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "-1");
+    }
+
+    if (joinOption.indexOf("Hash") >= 0) {
+      testingCluster.setAllTajoDaemonConfValue(
+          ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256 * 1048576));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+          String.valueOf(256 * 1048576));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+          String.valueOf(256 * 1048576));
+    }
+    if (joinOption.indexOf("Sort") >= 0) {
+      testingCluster.setAllTajoDaemonConfValue(
+          ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+          String.valueOf(1));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+          String.valueOf(1));
+    }
+  }
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {"Hash_NoBroadcast"},
+        {"Sort_NoBroadcast"},
+        {"Hash"},
+        {"Sort"},
+    });
+  }
+
+  public static void setup() throws Exception {
+    if (reference++ == 0) {
+      createCommonTables();
+    }
+  }
+
+  public static void classTearDown() throws SQLException {
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname,
+        ConfVars.$TEST_BROADCAST_JOIN_ENABLED.defaultVal);
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname,
+        ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.defaultVal);
+
+    testingCluster.setAllTajoDaemonConfValue(
+        ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+        ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+        ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+        ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
+
+    if (--reference == 0) {
+      dropCommonTables();
+    }
+  }
+
+  protected static void createCommonTables() throws Exception {
+    LOG.info("Create common tables for join tests");
+
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    String[] data = new String[]{"1|table11-1", "2|table11-2", "3|table11-3", "4|table11-4", "5|table11-5"};
+    TajoTestingCluster.createTable("jointable11", schema, tableOptions, data, 2);
+
+    schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    data = new String[]{"1|table12-1", "2|table12-2"};
+    TajoTestingCluster.createTable("jointable12", schema, tableOptions, data, 2);
+
+    schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    data = new String[]{"2|table13-2", "3|table13-3"};
+    TajoTestingCluster.createTable("jointable13", schema, tableOptions, data);
+
+    schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    data = new String[]{"1|table14-1", "2|table14-2", "3|table14-3", "4|table14-4"};
+    TajoTestingCluster.createTable("jointable14", schema, tableOptions, data);
+
+    schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    data = new String[]{};
+    TajoTestingCluster.createTable("jointable15", schema, tableOptions, data);
+
+    schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    data = new String[]{"1000000|a", "1000001|b", "2|c", "3|d", "4|e"};
+    TajoTestingCluster.createTable("jointable1", schema, tableOptions, data, 1);
+
+    data = new String[10000];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable" + i;
+    }
+    TajoTestingCluster.createTable("jointable_large", schema, tableOptions, data, 2);
+
+    // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner.
+    // testMultipleBroadcastDataFileWithZeroLength testcase is for the leaf node
+    createMultiFile("nation", 2, new TupleCreator() {
+      public Tuple createTuple(String[] columnDatas) {
+        return new VTuple(new Datum[]{
+            new Int4Datum(Integer.parseInt(columnDatas[0])),
+            new TextDatum(columnDatas[1]),
+            new Int4Datum(Integer.parseInt(columnDatas[2])),
+            new TextDatum(columnDatas[3])
+        });
+      }
+    });
+    addEmptyDataFile("nation_multifile", false);
+  }
+
+  protected static void dropCommonTables() throws SQLException {
+    LOG.info("Clear common tables for join tests");
+
+    client.executeQuery("DROP TABLE IF EXISTS jointable11 PURGE;");
+    client.executeQuery("DROP TABLE IF EXISTS jointable12 PURGE;");
+    client.executeQuery("DROP TABLE IF EXISTS jointable13 PURGE;");
+    client.executeQuery("DROP TABLE IF EXISTS jointable14 PURGE;");
+    client.executeQuery("DROP TABLE IF EXISTS jointable15 PURGE;");
+    client.executeQuery("DROP TABLE IF EXISTS jointable1 PURGE");
+    client.executeQuery("DROP TABLE IF EXISTS jointable_large PURGE");
+    client.executeQuery("DROP TABLE IF EXISTS nation_multifile PURGE");
+  }
+
+  interface TupleCreator {
+    Tuple createTuple(String[] columnDatas);
+  }
+
+  private static String buildSchemaString(String tableName) throws TajoException {
+    TableDesc desc = client.getTableDesc(tableName);
+    StringBuffer sb = new StringBuffer();
+    for (Column column : desc.getSchema().getRootColumns()) {
+      sb.append(column.getSimpleName()).append(" ").append(column.getDataType().getType());
+      TajoDataTypes.DataType dataType = column.getDataType();
+      if (dataType.getLength() > 0) {
+        sb.append("(").append(dataType.getLength()).append(")");
+      }
+      sb.append(",");
+    }
+    sb.deleteCharAt(sb.length()-1);
+    return sb.toString();
+  }
+
+  private static String buildMultifileDDlString(String tableName) throws TajoException {
+    String multiTableName = tableName + "_multifile";
+    StringBuilder sb = new StringBuilder("create table ").append(multiTableName).append(" (");
+    sb.append(buildSchemaString(tableName)).append(" )");
+    return sb.toString();
+  }
+
+  protected static void createMultiFile(String tableName, int numRowsEachFile, TupleCreator tupleCreator) throws Exception {
+    // make multiple small file
+    String multiTableName = tableName + "_multifile";
+    String sql = buildMultifileDDlString(tableName);
+    client.executeQueryAndGetResult(sql);
+
+    TableDesc table = client.getTableDesc(multiTableName);
+    assertNotNull(table);
+
+    TableMeta tableMeta = table.getMeta();
+    Schema schema = table.getLogicalSchema();
+
+    File file = new File("src/test/tpch/" + tableName + ".tbl");
+
+    if (!file.exists()) {
+      file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + tableName + ".tbl");
+    }
+    String[] rows = FileUtil.readTextFile(file).split("\n");
+
+    assertTrue(rows.length > 0);
+
+    int fileIndex = 0;
+
+    Appender appender = null;
+    for (int i = 0; i < rows.length; i++) {
+      if (i % numRowsEachFile == 0) {
+        if (appender != null) {
+          appender.flush();
+          appender.close();
+        }
+        Path dataPath = new Path(table.getUri().toString(), fileIndex + ".csv");
+        fileIndex++;
+        appender = (((FileTablespace) TablespaceManager.getLocalFs()))
+            .getAppender(tableMeta, schema, dataPath);
+        appender.init();
+      }
+      String[] columnDatas = rows[i].split("\\|");
+      Tuple tuple = tupleCreator.createTuple(columnDatas);
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+  }
+
+  protected static void addEmptyDataFile(String tableName, boolean isPartitioned) throws Exception {
+    TableDesc table = client.getTableDesc(tableName);
+
+    Path path = new Path(table.getUri());
+    FileSystem fs = path.getFileSystem(conf);
+    if (isPartitioned) {
+      List<Path> partitionPathList = getPartitionPathList(fs, path);
+      for (Path eachPath: partitionPathList) {
+        Path dataPath = new Path(eachPath, 0 + "_empty.csv");
+        OutputStream out = fs.create(dataPath);
+        out.close();
+      }
+    } else {
+      Path dataPath = new Path(path, 0 + "_empty.csv");
+      OutputStream out = fs.create(dataPath);
+      out.close();
+    }
+  }
+
+  protected static List<Path> getPartitionPathList(FileSystem fs, Path path) throws Exception {
+    FileStatus[] files = fs.listStatus(path);
+    List<Path> paths = new ArrayList<Path>();
+    if (files != null) {
+      for (FileStatus eachFile: files) {
+        if (eachFile.isFile()) {
+          paths.add(path);
+          return paths;
+        } else {
+          paths.addAll(getPartitionPathList(fs, eachFile.getPath()));
+        }
+      }
+    }
+
+    return paths;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
new file mode 100644
index 0000000..d3cde3d
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.SQLException;
+
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestMultipleJoinTypes extends TestJoinQuery {
+
+  public TestMultipleJoinTypes(String joinOption) throws Exception {
+    super(joinOption);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestJoinQuery.setup();
+  }
+
+  @AfterClass
+  public static void classTearDown() throws SQLException {
+    TestJoinQuery.classTearDown();
+  }
+
+  @Test
+  @QueryTestCaseBase.Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @QueryTestCaseBase.SimpleTest()
+  public final void testJoinWithMultipleJoinTypes() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinsWithCaseWhen() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinsWithCaseWhen2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest(prepare = {
+      "CREATE TABLE customer_broad_parts (" +
+          "  c_nationkey INT4," +
+          "  c_name    TEXT," +
+          "  c_address    TEXT," +
+          "  c_phone    TEXT," +
+          "  c_acctbal    FLOAT8," +
+          "  c_mktsegment    TEXT," +
+          "  c_comment    TEXT" +
+          ") PARTITION BY COLUMN (c_custkey INT4)",
+      "INSERT OVERWRITE INTO customer_broad_parts" +
+          "  SELECT" +
+          "    c_nationkey," +
+          "    c_name," +
+          "    c_address," +
+          "    c_phone," +
+          "    c_acctbal," +
+          "    c_mktsegment," +
+          "    c_comment," +
+          "    c_custkey" +
+          "  FROM customer"
+  }, cleanup = {
+      "DROP TABLE customer_broad_parts PURGE"
+  }, queries = {
+      @QuerySpec("select a.l_orderkey, b.o_orderkey, c.c_custkey from lineitem a " +
+          "inner join orders b on a.l_orderkey = b.o_orderkey " +
+          "left outer join customer_broad_parts c on a.l_orderkey = c.c_custkey and c.c_custkey < 0")
+  })
+  public final void testInnerAndOuterWithEmpty() throws Exception {
+    runSimpleTests();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
new file mode 100644
index 0000000..bd8f830
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+
+public class TestNetTypes extends QueryTestCaseBase {
+
+  @Before
+  public final void setUp() throws Exception {
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      executeDDL("table1_ddl.sql", "table1");
+      executeDDL("table2_ddl.sql", "table2");
+    }
+  }
+
+  @Test
+  public final void testSelect() throws Exception {
+    // Skip all tests when HiveCatalogStore is used.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      // select name, addr from table1;
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void testGroupby() throws Exception {
+    // Skip all tests when HiveCatalogStore is used.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      // select name, addr, count(1) from table1 group by name, addr;
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void testGroupby2() throws Exception {
+    // Skip all tests when HiveCatalogStore is used.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      // select addr, count(*) from table1 group by addr;
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void testSort() throws Exception {
+    // Skip all tests when HiveCatalogStore is used.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      // select * from table1 order by addr;
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void testSort2() throws Exception {
+    // Skip all tests when HiveCatalogStore is used.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      // select addr from table2 order by addr;
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void testJoin() throws Exception {
+    // Skip all tests when HiveCatalogStore is used.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      // select * from table1 as t1, table2 as t2 where t1.addr = t2.addr;
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
new file mode 100644
index 0000000..66848e6
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.junit.Assert.*;
+
+/**
+ * This is the unit test for null values. This test needs specialized data sets.
+ * So, We separated it from other unit tests using TPC-H data set.
+ */
+@Category(IntegrationTest.class)
+public class TestNullValues {
+
+  private static TajoClient client;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    client =  TpchTestBase.getInstance().getTestingCluster().newTajoClient();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    client.close();
+  }
+
+  @Test
+  public final void testIsNull() throws Exception {
+    String [] table = new String[] {"nulltable1"};
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT4);
+    schema.addColumn("col2", Type.TEXT);
+    schema.addColumn("col3", Type.FLOAT4);
+    Schema [] schemas = new Schema[] {schema};
+    String [] data = {
+        "1|filled|0.1",
+        "2||",
+        "3|filled|0.2"
+    };
+    KeyValueSet opts = new KeyValueSet();
+    opts.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    ResultSet res = TajoTestingCluster
+        .run(table, schemas, opts, new String[][]{data},
+            "select * from nulltable1 where col3 is null", client);
+
+    try {
+      assertTrue(res.next());
+      assertEquals(2, res.getInt(1));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
+  }
+
+  @Test
+  public final void testIsNotNull() throws Exception {
+    String [] table = new String[] {"nulltable2"};
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT4);
+    schema.addColumn("col2", Type.TEXT);
+    Schema [] schemas = new Schema[] {schema};
+    String [] data = {
+        "1|filled|",
+        "||",
+        "3|filled|"
+    };
+    KeyValueSet opts = new KeyValueSet();
+    opts.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    ResultSet res = TajoTestingCluster
+        .run(table, schemas, opts, new String[][]{data},
+            "select * from nulltable2 where col1 is not null", client);
+    try {
+      assertTrue(res.next());
+      assertEquals(1, res.getInt(1));
+      assertTrue(res.next());
+      assertEquals(3, res.getInt(1));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
+  }
+
+  @Test
+  public final void testIsNotNull2() throws Exception {
+    String [] table = new String[] {"nulltable3"};
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT8);
+    schema.addColumn("col2", Type.INT8);
+    schema.addColumn("col3", Type.INT8);
+    schema.addColumn("col4", Type.INT8);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.INT8);
+    schema.addColumn("col8", Type.INT8);
+    schema.addColumn("col9", Type.INT8);
+    schema.addColumn("col10", Type.INT8);
+    Schema [] schemas = new Schema[] {schema};
+    String [] data = {
+        ",,,,672287821,1301460,1,313895860387,126288907,1024",
+        ",,,43578,19,13,6,3581,2557,1024"
+    };
+    KeyValueSet opts = new KeyValueSet();
+    opts.set(StorageConstants.TEXT_DELIMITER, ",");
+    ResultSet res = TajoTestingCluster
+        .run(table, schemas, opts, new String[][]{data},
+            "select * from nulltable3 where col1 is null and col2 is null and col3 is null and col4 = 43578", client);
+    try {
+      assertTrue(res.next());
+      assertEquals(43578, res.getLong(4));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
+  }
+
+  @Test
+  public final void testIsNotNull3() throws Exception {
+    String [] table = new String[] {"nulltable4"};
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT8);
+    schema.addColumn("col2", Type.INT8);
+    schema.addColumn("col3", Type.INT8);
+    schema.addColumn("col4", Type.INT8);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.INT8);
+    schema.addColumn("col8", Type.INT8);
+    schema.addColumn("col9", Type.INT8);
+    schema.addColumn("col10", Type.INT8);
+    Schema [] schemas = new Schema[] {schema};
+    String [] data = {
+        "\\N,,,,672287821,",
+        ",\\N,,43578"
+    };
+    KeyValueSet opts = new KeyValueSet();
+    opts.set(StorageConstants.TEXT_DELIMITER, ",");
+    opts.set(StorageConstants.TEXT_NULL, "\\\\N");
+    ResultSet res = TajoTestingCluster
+        .run(table, schemas, opts, new String[][]{data},
+            "select * from nulltable4 where col1 is null and col2 is null and col3 is null and col5 is null and col4 = 43578"
+            , client);
+    try {
+      assertTrue(res.next());
+      assertEquals(43578, res.getLong(4));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
+  }
+
+  @Test
+  public final void testResultSetNullSimpleQuery() throws Exception {
+    String tableName = "nulltable5";
+    ResultSet res = null;
+
+    try {
+      res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName, client);
+      int numRows = 0;
+
+      String expected =
+              "null|a|1.0|true\n" +
+              "2|null|2.0|false\n" +
+              "3|c|null|true\n" +
+              "4|d|4.0|null";
+
+      String result = "";
+
+      String prefix = "";
+      while(res.next()) {
+        for (int i = 0; i < 4; i++) {
+          result += prefix + res.getObject(i + 1);
+          prefix = "|";
+        }
+        prefix = "\n";
+
+        assertResultSetNull(res, numRows, false, new int[]{1,2,3,4});
+        assertResultSetNull(res, numRows, true, new int[]{1,2,3,4});
+        numRows++;
+      }
+      assertEquals(4, numRows);
+      assertEquals(expected, result);
+    } finally {
+      if (res != null) {
+        res.close();
+      }
+    }
+  }
+
+  @Test
+  public final void testResultSetNull() throws Exception {
+    String tableName = "nulltable6";
+    String query = "select " +
+        "col1, coalesce(col1, 99999), " +
+        "col2, coalesce(col2, 'null_value'), " +
+        "col3, coalesce(col3, 99999.0)," +
+        "col4 " +
+        "from " + tableName;
+
+    ResultSet res = null;
+
+    try {
+      res = runNullTableQuery(tableName, query, client);
+      int numRows = 0;
+      String expected =
+          "null|99999|a|a|1.0|1.0|true\n" +
+          "2|2|null|null_value|2.0|2.0|false\n" +
+          "3|3|c|c|null|99999.0|true\n" +
+          "4|4|d|d|4.0|4.0|null";
+
+      String result = "";
+
+      String prefix = "";
+      while(res.next()) {
+        for (int i = 0; i < 7; i++) {
+          result += prefix + res.getObject(i + 1);
+          prefix = "|";
+        }
+        prefix = "\n";
+
+        assertResultSetNull(res, numRows, false, new int[]{1,3,5,7});
+        assertResultSetNull(res, numRows, true, new int[]{1,3,5,7});
+        numRows++;
+      }
+      assertEquals(4, numRows);
+      assertEquals(expected, result);
+    } finally {
+      if (res != null) {
+        res.close();
+      }
+    }
+  }
+
+  private ResultSet runNullTableQuery(String tableName, String query, TajoClient client) throws Exception {
+    String [] table = new String[] {tableName};
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT4);
+    schema.addColumn("col2", Type.TEXT);
+    schema.addColumn("col3", Type.FLOAT4);
+    schema.addColumn("col4", Type.BOOLEAN);
+    Schema [] schemas = new Schema[] {schema};
+    String [] data = {
+        "\\N|a|1.0|t",
+        "2|\\N|2.0|f",
+        "3|c|\\N|t",
+        "4|d|4.0|\\N"
+    };
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    if (client == null) {
+      return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query);
+    } else {
+      return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query, client);
+    }
+  }
+
+  private void assertResultSetNull(ResultSet res, int numRows, boolean useName, int[] nullIndex) throws SQLException {
+    if (numRows == 0) {
+      if (useName) {
+        assertEquals(0, res.getInt(res.getMetaData().getColumnName(nullIndex[numRows])));
+      } else {
+        assertEquals(0, res.getInt(nullIndex[numRows]));
+      }
+    }
+
+    if (numRows == 1) {
+      if (useName) {
+        assertNull(res.getString(res.getMetaData().getColumnName(nullIndex[numRows])));
+      } else {
+        assertNull(res.getString(nullIndex[numRows]));
+      };
+    }
+
+    if (numRows == 2) {
+      if (useName) {
+        assertEquals(0.0, res.getDouble(res.getMetaData().getColumnName(nullIndex[numRows])), 10);
+      } else {
+        assertEquals(0.0, res.getDouble(nullIndex[numRows]), 10);
+      }
+    }
+
+    if (numRows == 3) {
+      if (useName) {
+        assertEquals(false, res.getBoolean(res.getMetaData().getColumnName(nullIndex[numRows])));
+      } else {
+        assertEquals(false, res.getBoolean(nullIndex[numRows]));
+      }
+    }
+  }
+}