You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/14 16:30:13 UTC
[34/51] [partial] tajo git commit: TAJO-1761: Separate an integration
unit test kit into an independent module.
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
new file mode 100644
index 0000000..d0088a5
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -0,0 +1,882 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.sql.ResultSet;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestInsertQuery extends QueryTestCaseBase {
+
+ @Test
+ public final void testInsertOverwrite() throws Exception {
+ ResultSet res = executeFile("table1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+ res = executeFile("testInsertOverwrite.sql");
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+
+ executeString("DROP TABLE table1 PURGE");
+ }
+
+ @Test
+ public final void testInsertInto() throws Exception {
+ // create table and upload test data
+ ResultSet res = executeFile("table1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+ res = executeFile("testInsertOverwrite.sql");
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeFile("testInsertInto.sql");
+ res.close();
+
+ List<Path> dataFiles = listTableFiles("table1");
+ assertEquals(2, dataFiles.size());
+
+ for (int i = 0; i < dataFiles.size(); i++) {
+ String name = dataFiles.get(i).getName();
+ assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+ String[] tokens = name.split("-");
+ assertEquals(4, tokens.length);
+ assertEquals(i, Integer.parseInt(tokens[3]));
+ }
+
+ String tableDatas = getTableFileContents("table1");
+
+ String expected = "1|1|17.0\n" +
+ "1|1|36.0\n" +
+ "2|2|38.0\n" +
+ "3|2|45.0\n" +
+ "3|3|49.0\n" +
+ "1|1|17.0\n" +
+ "1|1|36.0\n" +
+ "2|2|38.0\n" +
+ "3|2|45.0\n" +
+ "3|3|49.0\n";
+
+ assertNotNull(tableDatas);
+ assertEquals(expected, tableDatas);
+
+ executeString("DROP TABLE table1 PURGE");
+ }
+
+ @Test
+ public final void testInsertIntoLocation() throws Exception {
+ Path dfsPath = new Path("/tajo-data/testInsertIntoLocation");
+ assertTestInsertIntoLocation(dfsPath);
+ }
+
+ @Test
+ public final void testInsertIntoLocationDifferentFSs() throws Exception {
+ Path localPath = CommonTestingUtil.getTestDir();
+ assertTestInsertIntoLocation(localPath);
+ }
+
+ public final void assertTestInsertIntoLocation(Path path) throws Exception {
+ FileSystem fs = null;
+
+ try {
+ executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close();
+
+ String resultFileData = getTableFileContents(path);
+ String expected = "1|1|1\n" +
+ "1|1|2\n" +
+ "2|2|1\n" +
+ "3|2|1\n" +
+ "3|3|2\n";
+
+ assertEquals(expected, resultFileData);
+
+ fs = path.getFileSystem(testingCluster.getConfiguration());
+
+ FileStatus[] files = fs.listStatus(path);
+ assertNotNull(files);
+ assertEquals(1, files.length);
+
+ for (FileStatus eachFileStatus : files) {
+ String name = eachFileStatus.getPath().getName();
+ assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+ }
+
+ executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close();
+ resultFileData = getTableFileContents(path);
+ expected = "1|1|1\n" +
+ "1|1|2\n" +
+ "2|2|1\n" +
+ "3|2|1\n" +
+ "3|3|2\n";
+
+ assertEquals(expected + expected, resultFileData);
+
+ files = fs.listStatus(path);
+ assertNotNull(files);
+ assertEquals(2, files.length);
+
+ for (FileStatus eachFileStatus : files) {
+ String name = eachFileStatus.getPath().getName();
+ assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+ }
+ } finally {
+ if (fs != null) {
+ fs.delete(path, true);
+ }
+ }
+ }
+
+ @Test
+ public final void testInsertIntoPartitionedTable() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoPartitionedTable");
+ executeString("create table " + tableName + " (n_name TEXT, n_regionkey INT4)" +
+ "USING csv PARTITION by column(n_nationkey INT4)" ).close();
+
+ try {
+ executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close();
+
+ ResultSet res = executeString("select * from " + tableName);
+
+ String expected = "n_name,n_regionkey,n_nationkey\n" +
+ "-------------------------------\n" +
+ "ALGERIA,0,0\n" +
+ "ARGENTINA,1,1\n" +
+ "IRAN,4,10\n" +
+ "IRAQ,4,11\n" +
+ "JAPAN,2,12\n" +
+ "JORDAN,4,13\n" +
+ "KENYA,0,14\n" +
+ "MOROCCO,0,15\n" +
+ "MOZAMBIQUE,0,16\n" +
+ "PERU,1,17\n" +
+ "CHINA,2,18\n" +
+ "ROMANIA,3,19\n" +
+ "BRAZIL,1,2\n" +
+ "SAUDI ARABIA,4,20\n" +
+ "VIETNAM,2,21\n" +
+ "RUSSIA,3,22\n" +
+ "UNITED KINGDOM,3,23\n" +
+ "UNITED STATES,1,24\n" +
+ "CANADA,1,3\n" +
+ "EGYPT,4,4\n" +
+ "ETHIOPIA,0,5\n" +
+ "FRANCE,3,6\n" +
+ "GERMANY,3,7\n" +
+ "INDIA,2,8\n" +
+ "INDONESIA,2,9\n";
+
+ assertEquals(expected, resultSetToString(res));
+ res.close();
+
+ executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close();
+ res = executeString("select * from " + tableName);
+ expected = "n_name,n_regionkey,n_nationkey\n" +
+ "-------------------------------\n" +
+ "ALGERIA,0,0\n" +
+ "ALGERIA,0,0\n" +
+ "ARGENTINA,1,1\n" +
+ "ARGENTINA,1,1\n" +
+ "IRAN,4,10\n" +
+ "IRAN,4,10\n" +
+ "IRAQ,4,11\n" +
+ "IRAQ,4,11\n" +
+ "JAPAN,2,12\n" +
+ "JAPAN,2,12\n" +
+ "JORDAN,4,13\n" +
+ "JORDAN,4,13\n" +
+ "KENYA,0,14\n" +
+ "KENYA,0,14\n" +
+ "MOROCCO,0,15\n" +
+ "MOROCCO,0,15\n" +
+ "MOZAMBIQUE,0,16\n" +
+ "MOZAMBIQUE,0,16\n" +
+ "PERU,1,17\n" +
+ "PERU,1,17\n" +
+ "CHINA,2,18\n" +
+ "CHINA,2,18\n" +
+ "ROMANIA,3,19\n" +
+ "ROMANIA,3,19\n" +
+ "BRAZIL,1,2\n" +
+ "BRAZIL,1,2\n" +
+ "SAUDI ARABIA,4,20\n" +
+ "SAUDI ARABIA,4,20\n" +
+ "VIETNAM,2,21\n" +
+ "VIETNAM,2,21\n" +
+ "RUSSIA,3,22\n" +
+ "RUSSIA,3,22\n" +
+ "UNITED KINGDOM,3,23\n" +
+ "UNITED KINGDOM,3,23\n" +
+ "UNITED STATES,1,24\n" +
+ "UNITED STATES,1,24\n" +
+ "CANADA,1,3\n" +
+ "CANADA,1,3\n" +
+ "EGYPT,4,4\n" +
+ "EGYPT,4,4\n" +
+ "ETHIOPIA,0,5\n" +
+ "ETHIOPIA,0,5\n" +
+ "FRANCE,3,6\n" +
+ "FRANCE,3,6\n" +
+ "GERMANY,3,7\n" +
+ "GERMANY,3,7\n" +
+ "INDIA,2,8\n" +
+ "INDIA,2,8\n" +
+ "INDONESIA,2,9\n" +
+ "INDONESIA,2,9\n";
+
+ assertEquals(expected, resultSetToString(res));
+
+ TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName);
+ assertNotNull(tableDesc);
+
+ Path path = new Path(tableDesc.getUri());
+ FileSystem fs = path.getFileSystem(testingCluster.getConfiguration());
+
+ FileStatus[] files = fs.listStatus(path);
+ assertNotNull(files);
+ assertEquals(25, files.length);
+
+ for (FileStatus eachFileStatus: files) {
+ assertTrue(eachFileStatus.getPath().getName().indexOf("n_nationkey=") == 0);
+ FileStatus[] dataFiles = fs.listStatus(eachFileStatus.getPath());
+ assertEquals(2, dataFiles.length);
+ for (FileStatus eachDataFileStatus: dataFiles) {
+ String name = eachDataFileStatus.getPath().getName();
+ assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+ }
+ }
+ } finally {
+ executeString("DROP TABLE " + tableName + " PURGE");
+ }
+ }
+
+ @Test
+ public final void testInsertOverwriteSmallerColumns() throws Exception {
+ ResultSet res = executeFile("table1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+ TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+
+ res = executeFile("testInsertOverwriteSmallerColumns.sql");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+ assertEquals(originalDesc.getSchema(), desc.getSchema());
+
+ executeString("DROP TABLE table1 PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteWithTargetColumns() throws Exception {
+ ResultSet res = executeFile("table1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+ TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+
+ res = executeFile("testInsertOverwriteWithTargetColumns.sql");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeString("select * from " + CatalogUtil.denormalizeIdentifier(getCurrentDatabase()) + ".table1");
+
+ assertTrue(res.next());
+ assertEquals(1, res.getLong(1));
+ assertTrue(0f == res.getFloat(2));
+ assertTrue(res.wasNull());
+ assertTrue(17.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertEquals(1, res.getLong(1));
+ assertTrue(0f == res.getFloat(2));
+ assertTrue(res.wasNull());
+ assertTrue(36.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertEquals(2, res.getLong(1));
+ assertTrue(0f == res.getFloat(2));
+ assertTrue(res.wasNull());
+ assertTrue(38.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertTrue(0f == res.getFloat(2));
+ assertTrue(res.wasNull());
+ assertTrue(45.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertEquals(3, res.getLong(1));
+ assertTrue(0f == res.getFloat(2));
+ assertTrue(res.wasNull());
+ assertTrue(49.0 == res.getFloat(3));
+
+ assertFalse(res.next());
+ res.close();
+
+ assertEquals(originalDesc.getSchema(), desc.getSchema());
+
+ executeString("DROP TABLE table1 PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteWithAsterisk() throws Exception {
+ ResultSet res = executeFile("full_table_csv_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_csv"));
+
+ res = executeString("insert overwrite into full_table_csv select * from default.lineitem where l_orderkey = 3");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_csv");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(2, desc.getStats().getNumRows().intValue());
+ }
+ executeString("DROP TABLE full_table_csv PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteWithAsteriskAndMore() throws Exception {
+ ResultSet res = executeFile("lineitem_year_month_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "lineitem_year_month"));
+
+ res = executeFile("load_to_lineitem_year_month.sql");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "lineitem_year_month");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeQuery();
+ assertResultSet(res);
+ res.close();
+
+ executeString("DROP TABLE lineitem_year_month PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteIntoSelect() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("insertoverwriteintoselect");
+ ResultSet res = executeString("create table " + tableName + " as select l_orderkey from default.lineitem");
+ assertFalse(res.next());
+ res.close();
+
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+ TableDesc orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, orderKeys.getStats().getNumRows().intValue());
+ }
+
+ // this query will result in the two rows.
+ res = executeString("insert overwrite into " + tableName + " select l_orderkey from default.lineitem where l_orderkey = 3");
+ assertFalse(res.next());
+ res.close();
+
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+ orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(2, orderKeys.getStats().getNumRows().intValue());
+ }
+ executeString("DROP TABLE " + tableName + " PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteCapitalTableName() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("testInsertOverwriteCapitalTableName");
+ ResultSet res = executeString("create table " + tableName + " as select * from default.lineitem");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+ res = executeString("insert overwrite into " + tableName + " select * from default.lineitem where l_orderkey = 3");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(2, desc.getStats().getNumRows().intValue());
+ }
+ executeString("DROP TABLE " + tableName + " PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteLocation() throws Exception {
+ ResultSet res = executeQuery();
+ res.close();
+ FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+ assertTrue(fs.exists(new Path("/tajo-data/testInsertOverwriteCapitalTableName")));
+ assertEquals(1, fs.listStatus(new Path("/tajo-data/testInsertOverwriteCapitalTableName")).length);
+ }
+
+ @Test
+ public final void testInsertOverwriteWithCompression() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("testInsertOverwriteWithCompression");
+ ResultSet res = executeFile("testInsertOverwriteWithCompression_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+ res = executeQuery();
+ res.close();
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(2, desc.getStats().getNumRows().intValue());
+ }
+
+ FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+ assertTrue(fs.exists(new Path(desc.getUri())));
+ CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+
+ for (FileStatus file : fs.listStatus(new Path(desc.getUri()))) {
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+ }
+ executeString("DROP TABLE " + tableName + " PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteLocationWithCompression() throws Exception {
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ ResultSet res = executeQuery();
+ res.close();
+ FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+ Path path = new Path("/tajo-data/testInsertOverwriteLocationWithCompression");
+ assertTrue(fs.exists(path));
+ assertEquals(1, fs.listStatus(path).length);
+
+ CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+ for (FileStatus file : fs.listStatus(path)){
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+ }
+ }
+ }
+
+ @Test
+ public final void testInsertOverwriteWithAsteriskUsingParquet() throws Exception {
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ ResultSet res = executeFile("full_table_parquet_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_parquet"));
+
+ res = executeString(
+ "insert overwrite into full_table_parquet select * from default.lineitem where l_orderkey = 3");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_parquet");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(2, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeString("select * from full_table_parquet;");
+ assertResultSet(res);
+
+ res = executeString("select l_orderkey, l_partkey from full_table_parquet;");
+ assertResultSet(res, "testInsertOverwriteWithAsteriskUsingParquet2.result");
+
+ executeString("DROP TABLE full_table_parquet PURGE");
+ }
+ }
+
+ @Test
+ public final void testInsertOverwriteIntoParquet() throws Exception {
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ executeString("create table parquet_table " +
+ "(l_orderkey int4, l_shipdate text, l_shipdate_function text) using parquet").close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table"));
+
+ executeString(
+ "insert overwrite into parquet_table " +
+ "select l_orderkey, l_shipdate, substr(l_shipdate, 1, 10) from default.lineitem").close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+
+ ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " +
+ "from parquet_table ");
+
+ String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" +
+ "-------------------------------\n" +
+ "1,1996-03-13,1996-03-13\n" +
+ "1,1996-04-12,1996-04-12\n" +
+ "2,1997-01-28,1997-01-28\n" +
+ "3,1994-02-02,1994-02-02\n" +
+ "3,1993-11-09,1993-11-09\n";
+
+ assertEquals(expected, resultSetToString(res));
+
+ executeString("DROP TABLE parquet_table PURGE");
+ }
+ }
+
+ @Test
+ public final void testInsertOverwriteIntoPartitionedParquet() throws Exception {
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ executeString("create table parquet_table " +
+ "(l_orderkey int4, l_shipdate_function text) using parquet partition by column (l_shipdate text)").close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table"));
+
+ executeString(
+ "insert overwrite into parquet_table " +
+ "select l_orderkey, substr(l_shipdate, 1, 10), l_shipdate from default.lineitem").close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+
+ ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " +
+ "from parquet_table ");
+
+ String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" +
+ "-------------------------------\n" +
+ "3,1993-11-09,1993-11-09\n" +
+ "3,1994-02-02,1994-02-02\n" +
+ "1,1996-03-13,1996-03-13\n" +
+ "1,1996-04-12,1996-04-12\n" +
+ "2,1997-01-28,1997-01-28\n";
+
+ assertEquals(expected, resultSetToString(res));
+
+ executeString("DROP TABLE parquet_table PURGE");
+ }
+ }
+
+ @Test
+ public final void testInsertOverwriteWithDatabase() throws Exception {
+ ResultSet res = executeFile("table1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+ res = executeQuery();
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+ }
+ executeString("DROP TABLE table1 PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteTableWithNonFromQuery() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+ ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+ res.close();
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+ res = executeString("insert overwrite into " + tableName
+ + " select 1::INT4, 2.1::FLOAT4, 'test'; ");
+
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(1, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeString("select * from " + tableName + ";");
+ assertTrue(res.next());
+
+ assertEquals(3, res.getMetaData().getColumnCount());
+ assertEquals(1, res.getInt(1));
+ assertEquals(2.1f, res.getFloat(2), 10);
+ assertEquals("test", res.getString(3));
+
+ res.close();
+ executeString("DROP TABLE " + tableName + " PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery2");
+ ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+ res.close();
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+ res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';");
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ assertEquals(1, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeString("select * from " + tableName + ";");
+ assertTrue(res.next());
+
+ assertEquals(3, res.getMetaData().getColumnCount());
+ assertEquals(1, res.getInt(1));
+ assertNull(res.getString(2));
+ assertEquals(0.0, res.getDouble(2), 10);
+ assertEquals("test", res.getString(3));
+
+ res.close();
+ executeString("DROP TABLE " + tableName + " PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwritePathWithNonFromQuery() throws Exception {
+ ResultSet res = executeString("insert overwrite into location " +
+ "'/tajo-data/testInsertOverwritePathWithNonFromQuery' " +
+ "USING csv WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+ "select 1::INT4, 2.1::FLOAT4, 'test'");
+
+ res.close();
+ FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+ Path path = new Path("/tajo-data/testInsertOverwritePathWithNonFromQuery");
+ assertTrue(fs.exists(path));
+ assertEquals(1, fs.listStatus(path).length);
+
+ CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+ FileStatus file = fs.listStatus(path)[0];
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(codec.createInputStream(fs.open(file.getPath()))));
+
+ try {
+ String line = reader.readLine();
+ assertNotNull(line);
+
+ String[] tokens = line.split("\\|");
+
+ assertEquals(3, tokens.length);
+ assertEquals("1", tokens[0]);
+ assertEquals("2.1", tokens[1]);
+ assertEquals("test", tokens[2]);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public final void testInsertOverwriteWithUnion() throws Exception {
+ ResultSet res = executeFile("table1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+ res = executeFile("testInsertOverwriteWithUnion.sql");
+ res.close();
+
+ String tableDatas = getTableFileContents("table1");
+
+ String expected = "1|1|17.0\n" +
+ "1|1|36.0\n" +
+ "2|2|38.0\n" +
+ "3|2|45.0\n" +
+ "3|3|49.0\n" +
+ "1|3|173665.47\n" +
+ "2|4|46929.18\n" +
+ "3|2|193846.25\n";
+
+ assertNotNull(tableDatas);
+ assertEquals(expected, tableDatas);
+
+ executeString("DROP TABLE table1 PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteWithUnionDifferentAlias() throws Exception {
+ ResultSet res = executeFile("table1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+ res = executeFile("testInsertOverwriteWithUnionDifferentAlias.sql");
+ res.close();
+
+ String tableDatas = getTableFileContents("table1");
+
+ String expected = "1|1|17.0\n" +
+ "1|1|36.0\n" +
+ "2|2|38.0\n" +
+ "3|2|45.0\n" +
+ "3|3|49.0\n" +
+ "1|3|173665.47\n" +
+ "2|4|46929.18\n" +
+ "3|2|193846.25\n";
+
+ assertNotNull(tableDatas);
+ assertEquals(expected, tableDatas);
+
+ executeString("DROP TABLE table1 PURGE");
+ }
+
+ @Test
+ public final void testInsertOverwriteLocationWithUnion() throws Exception {
+ ResultSet res = executeFile("testInsertOverwriteLocationWithUnion.sql");
+ res.close();
+
+ String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnion"));
+
+ String expected = "1|1|17.0\n" +
+ "1|1|36.0\n" +
+ "2|2|38.0\n" +
+ "3|2|45.0\n" +
+ "3|3|49.0\n" +
+ "1|3|173665.47\n" +
+ "2|4|46929.18\n" +
+ "3|2|193846.25\n";
+
+ assertNotNull(resultDatas);
+ assertEquals(expected, resultDatas);
+ }
+
+ @Test
+ public final void testInsertOverwriteLocationWithUnionDifferenceAlias() throws Exception {
+ ResultSet res = executeFile("testInsertOverwriteLocationWithUnionDifferenceAlias.sql");
+ res.close();
+
+ String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnionDifferenceAlias"));
+
+ String expected = "1|1|17.0\n" +
+ "1|1|36.0\n" +
+ "2|2|38.0\n" +
+ "3|2|45.0\n" +
+ "3|3|49.0\n" +
+ "1|3|173665.47\n" +
+ "2|4|46929.18\n" +
+ "3|2|193846.25\n";
+
+ assertNotNull(resultDatas);
+ assertEquals(expected, resultDatas);
+ }
+
+ @Test
+ public final void testInsertWithDifferentColumnOrder() throws Exception {
+ ResultSet res = executeFile("nation_diff_col_order.ddl");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "nation_diff"));
+
+ try {
+ res = executeFile("testInsertWithDifferentColumnOrder.sql");
+ res.close();
+
+ res = executeString("select * from nation_diff");
+ assertResultSet(res);
+ } finally {
+ executeString("drop table nation_diff purge;");
+ }
+ }
+
+ @Test
+ public final void testFixedCharSelectWithNoLength() throws Exception {
+ ResultSet res = executeFile("test1_nolength_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "test1"));
+
+ res = executeFile("testInsertIntoSelectWithFixedSizeCharWithNoLength.sql");
+ res.close();
+
+ //remove \0
+ String resultDatas = getTableFileContents("test1").replaceAll("\0","");
+ String expected = "a\n";
+
+ assertNotNull(resultDatas);
+ assertEquals(expected.length(), resultDatas.length());
+ assertEquals(expected, resultDatas);
+ executeString("DROP TABLE test1 PURGE");
+ }
+
+ @Test
+ public final void testFixedCharSelect() throws Exception {
+ ResultSet res = executeFile("test1_ddl.sql");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "test1"));
+
+ res = executeFile("testInsertIntoSelectWithFixedSizeChar.sql");
+ res.close();
+
+ //remove \0
+ String resultDatas = getTableFileContents("test1").replaceAll("\0","");
+ String expected = "a\n" +
+ "abc\n" +
+ "abcde\n";
+
+ assertNotNull(resultDatas);
+ assertEquals(expected.length(), resultDatas.length());
+ assertEquals(expected, resultDatas);
+ executeString("DROP TABLE test1 PURGE");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
new file mode 100644
index 0000000..d2585a7
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * NOTE: Plan tests are disabled in TestJoinOnPartitionedTables.
+ * A plan reading partitioned table currently contains HDFS paths to input partitions.
+ * An example form of path to an input partition is hdfs://localhost:60305/tajo/warehouse/default/customer_parts/c_nationkey=1.
+ * Here, the different HDFS port is used for each test run, it is difficult to test query plans that read partitioned table.
+ */
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+@NotThreadSafe
+public class TestJoinOnPartitionedTables extends TestJoinQuery {
+
+ public TestJoinOnPartitionedTables(String joinOption) throws Exception {
+ super(joinOption);
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ TestJoinQuery.setup();
+ client.executeQuery("CREATE TABLE if not exists customer_parts " +
+ "(c_custkey INT4, c_name TEXT, c_address TEXT, c_phone TEXT, c_acctbal FLOAT8, c_mktsegment TEXT, c_comment TEXT) " +
+ "PARTITION BY COLUMN (c_nationkey INT4) as " +
+ "SELECT c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey FROM customer;");
+ client.executeQueryAndGetResult("create table if not exists nation_partitioned (n_name text) " +
+ "partition by column(n_nationkey int4, n_regionkey int4) " +
+ "as select n_name, n_nationkey, n_regionkey from nation");
+ addEmptyDataFile("nation_partitioned", true);
+ }
+
+ @AfterClass
+ public static void classTearDown() throws SQLException {
+ TestJoinQuery.classTearDown();
+ client.executeQuery("DROP TABLE IF EXISTS customer_parts PURGE");
+ client.executeQuery("DROP TABLE IF EXISTS nation_partitioned PURGE");
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testPartitionTableJoinSmallTable() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testNoProjectionJoinQual() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testPartialFilterPushDown() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testPartialFilterPushDownOuterJoin() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testPartialFilterPushDownOuterJoin2() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void selfJoinOfPartitionedTable() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest(queries = {
+ @QuerySpec("select a.c_custkey, b.c_custkey from " +
+ " (select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+ " union all " +
+ " select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+ ") a " +
+ "left outer join customer_parts b " +
+ "on a.c_custkey = b.c_custkey " +
+ "and a.c_nationkey > 0")
+ })
+ public void testPartitionMultiplePartitionFilter() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testFilterPushDownPartitionColumnCaseWhen() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+ @SimpleTest()
+ public void testMultiplePartitionedBroadcastDataFileWithZeroLength() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+ @SimpleTest()
+ public void testMultiplePartitionedBroadcastDataFileWithZeroLength2() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ public final void testCasebyCase1() throws Exception {
+ // Left outer join with a small table and a large partition table which not matched any partition path.
+ String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable");
+ executeString(
+ "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" +
+ "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" +
+ "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" +
+ "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" +
+ "partition by column(l_orderkey int4) ").close();
+
+ try {
+ executeString("insert overwrite into " + tableName +
+ " select l_partkey, l_suppkey, l_linenumber, \n" +
+ " l_quantity, l_extendedprice, l_discount, l_tax, \n" +
+ " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" +
+ " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem");
+
+ ResultSet res = executeString(
+ "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " +
+ "left outer join " + tableName + " b " +
+ "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000"
+ );
+
+ String expected = "key1,key2\n" +
+ "-------------------------------\n" +
+ "1,null\n" +
+ "1,null\n" +
+ "2,null\n" +
+ "3,null\n" +
+ "3,null\n";
+ assertEquals(expected, resultSetToString(res));
+ cleanupQuery(res);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
+ }
+
+ // TODO: This test should be reverted after resolving TAJO-1600
+// @Test
+ public final void testBroadcastMultiColumnPartitionTable() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable");
+ ResultSet res = testBase.execute(
+ "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) ");
+ res.close();
+ TajoTestingCluster cluster = testBase.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+ try {
+ res = executeString("insert overwrite into " + tableName
+ + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders");
+ res.close();
+
+ res = executeString(
+ "select distinct a.col3 from " + tableName + " as a " +
+ "left outer join lineitem b " +
+ "on a.col1 = b.l_orderkey order by a.col3"
+ );
+
+ assertResultSet(res);
+ cleanupQuery(res);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
+ }
+
+ @Test
+ public final void testSelfJoin() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
+ ResultSet res = executeString(
+ "create table " + tableName + " (n_name text,"
+ + " n_comment text, n_regionkey int8) USING text "
+ + "WITH ('text.delimiter'='|')"
+ + "PARTITION BY column(n_nationkey int8)");
+ res.close();
+ assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+ try {
+ res = executeString(
+ "insert overwrite into " + tableName
+ + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
+ res.close();
+
+ res = executeString(
+ "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
+ + " where a.n_nationkey in (1)");
+ String expected = resultSetToString(res);
+ res.close();
+
+ res = executeString(
+ "select a.n_nationkey, a.n_name from " + tableName + " a join " + tableName +
+ " b on a.n_nationkey = b.n_nationkey "
+ + " where a.n_nationkey in (1)");
+ String resultSetData = resultSetToString(res);
+ res.close();
+
+ assertEquals(expected, resultSetData);
+ cleanupQuery(res);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
+ }
+
+ @Test
+ public final void testSelfJoin2() throws Exception {
+ /*
+ https://issues.apache.org/jira/browse/TAJO-1102
+ See the following case.
+ CREATE TABLE orders_partition
+ (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,
+ o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING TEXT WITH ('text.delimiter'='|')
+ PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT);
+
+ select a.o_orderstatus, count(*) as cnt
+ from orders_partition a
+ inner join orders_partition b
+ on a.o_orderdate = b.o_orderdate
+ and a.o_orderstatus = b.o_orderstatus
+ and a.o_orderkey = b.o_orderkey
+ where a.o_orderdate='1995-02-21'
+ and a.o_orderstatus in ('F')
+ group by a.o_orderstatus;
+
+ Because of the where condition[where a.o_orderdate='1995-02-21 and a.o_orderstatus in ('F')],
+ orders_partition table aliased a is small and broadcast target.
+ */
+ String tableName = CatalogUtil.normalizeIdentifier("partitioned_orders");
+ ResultSet res = executeString(
+ "create table " + tableName + " (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,\n" +
+ "o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING TEXT WITH ('text.delimiter'='|')\n" +
+ "PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT, o_orderkey_mod INT8)");
+ res.close();
+ assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+ try {
+ res = executeString(
+ "insert overwrite into " + tableName +
+ " select o_orderkey, o_custkey, o_totalprice, " +
+ " o_orderpriority, o_clerk, o_shippriority, o_comment, o_orderdate, o_orderstatus, o_orderkey % 10 " +
+ " from orders ");
+ res.close();
+
+ res = executeString(
+ "select a.o_orderdate, a.o_orderstatus, a.o_orderkey % 10 as o_orderkey_mod, a.o_totalprice " +
+ "from orders a " +
+ "join orders b on a.o_orderkey = b.o_orderkey " +
+ "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey % 10 = 1" +
+ " order by a.o_orderkey"
+ );
+ String expected = resultSetToString(res);
+ res.close();
+
+ res = executeString(
+ "select a.o_orderdate, a.o_orderstatus, a.o_orderkey_mod, a.o_totalprice " +
+ "from " + tableName +
+ " a join " + tableName + " b on a.o_orderkey = b.o_orderkey " +
+ "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey_mod = 1 " +
+ " order by a.o_orderkey"
+ );
+ String resultSetData = resultSetToString(res);
+ res.close();
+
+ cleanupQuery(res);
+ assertEquals(expected, resultSetData);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public final void testBroadcastPartitionTable() throws Exception {
+ // If all tables participate in the BROADCAST JOIN, there is some missing data.
+ executeDDL("customer_partition_ddl.sql", null);
+ ResultSet res = executeFile("insert_into_customer_partition.sql");
+ res.close();
+
+ try {
+ runSimpleTests();
+ } finally {
+ executeString("DROP TABLE customer_broad_parts PURGE");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
new file mode 100644
index 0000000..2fddbfa
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.Int4Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestJoinQuery extends QueryTestCaseBase {
+ private static final Log LOG = LogFactory.getLog(TestJoinQuery.class);
+ private static int reference = 0;
+
+ public TestJoinQuery(String joinOption) throws Exception {
+ super(TajoConstants.DEFAULT_DATABASE_NAME, joinOption);
+
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true");
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "" + (5 * 1024));
+
+ testingCluster.setAllTajoDaemonConfValue(
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+ ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
+
+ if (joinOption.indexOf("NoBroadcast") >= 0) {
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false");
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "-1");
+ }
+
+ if (joinOption.indexOf("Hash") >= 0) {
+ testingCluster.setAllTajoDaemonConfValue(
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256 * 1048576));
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+ String.valueOf(256 * 1048576));
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+ String.valueOf(256 * 1048576));
+ }
+ if (joinOption.indexOf("Sort") >= 0) {
+ testingCluster.setAllTajoDaemonConfValue(
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1));
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+ String.valueOf(1));
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+ String.valueOf(1));
+ }
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {"Hash_NoBroadcast"},
+ {"Sort_NoBroadcast"},
+ {"Hash"},
+ {"Sort"},
+ });
+ }
+
+ public static void setup() throws Exception {
+ if (reference++ == 0) {
+ createCommonTables();
+ }
+ }
+
+ public static void classTearDown() throws SQLException {
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname,
+ ConfVars.$TEST_BROADCAST_JOIN_ENABLED.defaultVal);
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname,
+ ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.defaultVal);
+
+ testingCluster.setAllTajoDaemonConfValue(
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
+ ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+ ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
+
+ if (--reference == 0) {
+ dropCommonTables();
+ }
+ }
+
+ protected static void createCommonTables() throws Exception {
+ LOG.info("Create common tables for join tests");
+
+ KeyValueSet tableOptions = new KeyValueSet();
+ tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+ Schema schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("name", TajoDataTypes.Type.TEXT);
+ String[] data = new String[]{"1|table11-1", "2|table11-2", "3|table11-3", "4|table11-4", "5|table11-5"};
+ TajoTestingCluster.createTable("jointable11", schema, tableOptions, data, 2);
+
+ schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("name", TajoDataTypes.Type.TEXT);
+ data = new String[]{"1|table12-1", "2|table12-2"};
+ TajoTestingCluster.createTable("jointable12", schema, tableOptions, data, 2);
+
+ schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("name", TajoDataTypes.Type.TEXT);
+ data = new String[]{"2|table13-2", "3|table13-3"};
+ TajoTestingCluster.createTable("jointable13", schema, tableOptions, data);
+
+ schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("name", TajoDataTypes.Type.TEXT);
+ data = new String[]{"1|table14-1", "2|table14-2", "3|table14-3", "4|table14-4"};
+ TajoTestingCluster.createTable("jointable14", schema, tableOptions, data);
+
+ schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("name", TajoDataTypes.Type.TEXT);
+ data = new String[]{};
+ TajoTestingCluster.createTable("jointable15", schema, tableOptions, data);
+
+ schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("name", TajoDataTypes.Type.TEXT);
+ data = new String[]{"1000000|a", "1000001|b", "2|c", "3|d", "4|e"};
+ TajoTestingCluster.createTable("jointable1", schema, tableOptions, data, 1);
+
+ data = new String[10000];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable" + i;
+ }
+ TajoTestingCluster.createTable("jointable_large", schema, tableOptions, data, 2);
+
+ // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner.
+ // testMultipleBroadcastDataFileWithZeroLength testcase is for the leaf node
+ createMultiFile("nation", 2, new TupleCreator() {
+ public Tuple createTuple(String[] columnDatas) {
+ return new VTuple(new Datum[]{
+ new Int4Datum(Integer.parseInt(columnDatas[0])),
+ new TextDatum(columnDatas[1]),
+ new Int4Datum(Integer.parseInt(columnDatas[2])),
+ new TextDatum(columnDatas[3])
+ });
+ }
+ });
+ addEmptyDataFile("nation_multifile", false);
+ }
+
+ protected static void dropCommonTables() throws SQLException {
+ LOG.info("Clear common tables for join tests");
+
+ client.executeQuery("DROP TABLE IF EXISTS jointable11 PURGE;");
+ client.executeQuery("DROP TABLE IF EXISTS jointable12 PURGE;");
+ client.executeQuery("DROP TABLE IF EXISTS jointable13 PURGE;");
+ client.executeQuery("DROP TABLE IF EXISTS jointable14 PURGE;");
+ client.executeQuery("DROP TABLE IF EXISTS jointable15 PURGE;");
+ client.executeQuery("DROP TABLE IF EXISTS jointable1 PURGE");
+ client.executeQuery("DROP TABLE IF EXISTS jointable_large PURGE");
+ client.executeQuery("DROP TABLE IF EXISTS nation_multifile PURGE");
+ }
+
+ interface TupleCreator {
+ Tuple createTuple(String[] columnDatas);
+ }
+
+ private static String buildSchemaString(String tableName) throws TajoException {
+ TableDesc desc = client.getTableDesc(tableName);
+ StringBuffer sb = new StringBuffer();
+ for (Column column : desc.getSchema().getRootColumns()) {
+ sb.append(column.getSimpleName()).append(" ").append(column.getDataType().getType());
+ TajoDataTypes.DataType dataType = column.getDataType();
+ if (dataType.getLength() > 0) {
+ sb.append("(").append(dataType.getLength()).append(")");
+ }
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length()-1);
+ return sb.toString();
+ }
+
+ private static String buildMultifileDDlString(String tableName) throws TajoException {
+ String multiTableName = tableName + "_multifile";
+ StringBuilder sb = new StringBuilder("create table ").append(multiTableName).append(" (");
+ sb.append(buildSchemaString(tableName)).append(" )");
+ return sb.toString();
+ }
+
+ protected static void createMultiFile(String tableName, int numRowsEachFile, TupleCreator tupleCreator) throws Exception {
+ // make multiple small file
+ String multiTableName = tableName + "_multifile";
+ String sql = buildMultifileDDlString(tableName);
+ client.executeQueryAndGetResult(sql);
+
+ TableDesc table = client.getTableDesc(multiTableName);
+ assertNotNull(table);
+
+ TableMeta tableMeta = table.getMeta();
+ Schema schema = table.getLogicalSchema();
+
+ File file = new File("src/test/tpch/" + tableName + ".tbl");
+
+ if (!file.exists()) {
+ file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + tableName + ".tbl");
+ }
+ String[] rows = FileUtil.readTextFile(file).split("\n");
+
+ assertTrue(rows.length > 0);
+
+ int fileIndex = 0;
+
+ Appender appender = null;
+ for (int i = 0; i < rows.length; i++) {
+ if (i % numRowsEachFile == 0) {
+ if (appender != null) {
+ appender.flush();
+ appender.close();
+ }
+ Path dataPath = new Path(table.getUri().toString(), fileIndex + ".csv");
+ fileIndex++;
+ appender = (((FileTablespace) TablespaceManager.getLocalFs()))
+ .getAppender(tableMeta, schema, dataPath);
+ appender.init();
+ }
+ String[] columnDatas = rows[i].split("\\|");
+ Tuple tuple = tupleCreator.createTuple(columnDatas);
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+ }
+
+ protected static void addEmptyDataFile(String tableName, boolean isPartitioned) throws Exception {
+ TableDesc table = client.getTableDesc(tableName);
+
+ Path path = new Path(table.getUri());
+ FileSystem fs = path.getFileSystem(conf);
+ if (isPartitioned) {
+ List<Path> partitionPathList = getPartitionPathList(fs, path);
+ for (Path eachPath: partitionPathList) {
+ Path dataPath = new Path(eachPath, 0 + "_empty.csv");
+ OutputStream out = fs.create(dataPath);
+ out.close();
+ }
+ } else {
+ Path dataPath = new Path(path, 0 + "_empty.csv");
+ OutputStream out = fs.create(dataPath);
+ out.close();
+ }
+ }
+
+ protected static List<Path> getPartitionPathList(FileSystem fs, Path path) throws Exception {
+ FileStatus[] files = fs.listStatus(path);
+ List<Path> paths = new ArrayList<Path>();
+ if (files != null) {
+ for (FileStatus eachFile: files) {
+ if (eachFile.isFile()) {
+ paths.add(path);
+ return paths;
+ } else {
+ paths.addAll(getPartitionPathList(fs, eachFile.getPath()));
+ }
+ }
+ }
+
+ return paths;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
new file mode 100644
index 0000000..d3cde3d
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.SQLException;
+
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestMultipleJoinTypes extends TestJoinQuery {
+
+ public TestMultipleJoinTypes(String joinOption) throws Exception {
+ super(joinOption);
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ TestJoinQuery.setup();
+ }
+
+ @AfterClass
+ public static void classTearDown() throws SQLException {
+ TestJoinQuery.classTearDown();
+ }
+
+ @Test
+ @QueryTestCaseBase.Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @QueryTestCaseBase.SimpleTest()
+ public final void testJoinWithMultipleJoinTypes() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public void testComplexJoinsWithCaseWhen() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public void testComplexJoinsWithCaseWhen2() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest(prepare = {
+ "CREATE TABLE customer_broad_parts (" +
+ " c_nationkey INT4," +
+ " c_name TEXT," +
+ " c_address TEXT," +
+ " c_phone TEXT," +
+ " c_acctbal FLOAT8," +
+ " c_mktsegment TEXT," +
+ " c_comment TEXT" +
+ ") PARTITION BY COLUMN (c_custkey INT4)",
+ "INSERT OVERWRITE INTO customer_broad_parts" +
+ " SELECT" +
+ " c_nationkey," +
+ " c_name," +
+ " c_address," +
+ " c_phone," +
+ " c_acctbal," +
+ " c_mktsegment," +
+ " c_comment," +
+ " c_custkey" +
+ " FROM customer"
+ }, cleanup = {
+ "DROP TABLE customer_broad_parts PURGE"
+ }, queries = {
+ @QuerySpec("select a.l_orderkey, b.o_orderkey, c.c_custkey from lineitem a " +
+ "inner join orders b on a.l_orderkey = b.o_orderkey " +
+ "left outer join customer_broad_parts c on a.l_orderkey = c.c_custkey and c.c_custkey < 0")
+ })
+ public final void testInnerAndOuterWithEmpty() throws Exception {
+ runSimpleTests();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
new file mode 100644
index 0000000..bd8f830
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+
+public class TestNetTypes extends QueryTestCaseBase {
+
+ @Before
+ public final void setUp() throws Exception {
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ executeDDL("table1_ddl.sql", "table1");
+ executeDDL("table2_ddl.sql", "table2");
+ }
+ }
+
+ @Test
+ public final void testSelect() throws Exception {
+ // Skip all tests when HiveCatalogStore is used.
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ // select name, addr from table1;
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+ }
+
+ @Test
+ public final void testGroupby() throws Exception {
+ // Skip all tests when HiveCatalogStore is used.
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ // select name, addr, count(1) from table1 group by name, addr;
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+ }
+
+ @Test
+ public final void testGroupby2() throws Exception {
+ // Skip all tests when HiveCatalogStore is used.
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ // select addr, count(*) from table1 group by addr;
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+ }
+
+ @Test
+ public final void testSort() throws Exception {
+ // Skip all tests when HiveCatalogStore is used.
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ // select * from table1 order by addr;
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+ }
+
+ @Test
+ public final void testSort2() throws Exception {
+ // Skip all tests when HiveCatalogStore is used.
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ // select addr from table2 order by addr;
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+ }
+
+ @Test
+ public final void testJoin() throws Exception {
+ // Skip all tests when HiveCatalogStore is used.
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ // select * from table1 as t1, table2 as t2 where t1.addr = t2.addr;
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
new file mode 100644
index 0000000..66848e6
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.junit.Assert.*;
+
+/**
+ * This is the unit test for null values. This test needs specialized data sets.
+ * So, We separated it from other unit tests using TPC-H data set.
+ */
+@Category(IntegrationTest.class)
+public class TestNullValues {
+
+ private static TajoClient client;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ client = TpchTestBase.getInstance().getTestingCluster().newTajoClient();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ client.close();
+ }
+
+ @Test
+ public final void testIsNull() throws Exception {
+ String [] table = new String[] {"nulltable1"};
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT4);
+ schema.addColumn("col2", Type.TEXT);
+ schema.addColumn("col3", Type.FLOAT4);
+ Schema [] schemas = new Schema[] {schema};
+ String [] data = {
+ "1|filled|0.1",
+ "2||",
+ "3|filled|0.2"
+ };
+ KeyValueSet opts = new KeyValueSet();
+ opts.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ ResultSet res = TajoTestingCluster
+ .run(table, schemas, opts, new String[][]{data},
+ "select * from nulltable1 where col3 is null", client);
+
+ try {
+ assertTrue(res.next());
+ assertEquals(2, res.getInt(1));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
+ }
+
+ @Test
+ public final void testIsNotNull() throws Exception {
+ String [] table = new String[] {"nulltable2"};
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT4);
+ schema.addColumn("col2", Type.TEXT);
+ Schema [] schemas = new Schema[] {schema};
+ String [] data = {
+ "1|filled|",
+ "||",
+ "3|filled|"
+ };
+ KeyValueSet opts = new KeyValueSet();
+ opts.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ ResultSet res = TajoTestingCluster
+ .run(table, schemas, opts, new String[][]{data},
+ "select * from nulltable2 where col1 is not null", client);
+ try {
+ assertTrue(res.next());
+ assertEquals(1, res.getInt(1));
+ assertTrue(res.next());
+ assertEquals(3, res.getInt(1));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
+ }
+
+ @Test
+ public final void testIsNotNull2() throws Exception {
+ String [] table = new String[] {"nulltable3"};
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT8);
+ schema.addColumn("col2", Type.INT8);
+ schema.addColumn("col3", Type.INT8);
+ schema.addColumn("col4", Type.INT8);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.INT8);
+ schema.addColumn("col8", Type.INT8);
+ schema.addColumn("col9", Type.INT8);
+ schema.addColumn("col10", Type.INT8);
+ Schema [] schemas = new Schema[] {schema};
+ String [] data = {
+ ",,,,672287821,1301460,1,313895860387,126288907,1024",
+ ",,,43578,19,13,6,3581,2557,1024"
+ };
+ KeyValueSet opts = new KeyValueSet();
+ opts.set(StorageConstants.TEXT_DELIMITER, ",");
+ ResultSet res = TajoTestingCluster
+ .run(table, schemas, opts, new String[][]{data},
+ "select * from nulltable3 where col1 is null and col2 is null and col3 is null and col4 = 43578", client);
+ try {
+ assertTrue(res.next());
+ assertEquals(43578, res.getLong(4));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
+ }
+
+ @Test
+ public final void testIsNotNull3() throws Exception {
+ String [] table = new String[] {"nulltable4"};
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT8);
+ schema.addColumn("col2", Type.INT8);
+ schema.addColumn("col3", Type.INT8);
+ schema.addColumn("col4", Type.INT8);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.INT8);
+ schema.addColumn("col8", Type.INT8);
+ schema.addColumn("col9", Type.INT8);
+ schema.addColumn("col10", Type.INT8);
+ Schema [] schemas = new Schema[] {schema};
+ String [] data = {
+ "\\N,,,,672287821,",
+ ",\\N,,43578"
+ };
+ KeyValueSet opts = new KeyValueSet();
+ opts.set(StorageConstants.TEXT_DELIMITER, ",");
+ opts.set(StorageConstants.TEXT_NULL, "\\\\N");
+ ResultSet res = TajoTestingCluster
+ .run(table, schemas, opts, new String[][]{data},
+ "select * from nulltable4 where col1 is null and col2 is null and col3 is null and col5 is null and col4 = 43578"
+ , client);
+ try {
+ assertTrue(res.next());
+ assertEquals(43578, res.getLong(4));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
+ }
+
+ @Test
+ public final void testResultSetNullSimpleQuery() throws Exception {
+ String tableName = "nulltable5";
+ ResultSet res = null;
+
+ try {
+ res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName, client);
+ int numRows = 0;
+
+ String expected =
+ "null|a|1.0|true\n" +
+ "2|null|2.0|false\n" +
+ "3|c|null|true\n" +
+ "4|d|4.0|null";
+
+ String result = "";
+
+ String prefix = "";
+ while(res.next()) {
+ for (int i = 0; i < 4; i++) {
+ result += prefix + res.getObject(i + 1);
+ prefix = "|";
+ }
+ prefix = "\n";
+
+ assertResultSetNull(res, numRows, false, new int[]{1,2,3,4});
+ assertResultSetNull(res, numRows, true, new int[]{1,2,3,4});
+ numRows++;
+ }
+ assertEquals(4, numRows);
+ assertEquals(expected, result);
+ } finally {
+ if (res != null) {
+ res.close();
+ }
+ }
+ }
+
+ @Test
+ public final void testResultSetNull() throws Exception {
+ String tableName = "nulltable6";
+ String query = "select " +
+ "col1, coalesce(col1, 99999), " +
+ "col2, coalesce(col2, 'null_value'), " +
+ "col3, coalesce(col3, 99999.0)," +
+ "col4 " +
+ "from " + tableName;
+
+ ResultSet res = null;
+
+ try {
+ res = runNullTableQuery(tableName, query, client);
+ int numRows = 0;
+ String expected =
+ "null|99999|a|a|1.0|1.0|true\n" +
+ "2|2|null|null_value|2.0|2.0|false\n" +
+ "3|3|c|c|null|99999.0|true\n" +
+ "4|4|d|d|4.0|4.0|null";
+
+ String result = "";
+
+ String prefix = "";
+ while(res.next()) {
+ for (int i = 0; i < 7; i++) {
+ result += prefix + res.getObject(i + 1);
+ prefix = "|";
+ }
+ prefix = "\n";
+
+ assertResultSetNull(res, numRows, false, new int[]{1,3,5,7});
+ assertResultSetNull(res, numRows, true, new int[]{1,3,5,7});
+ numRows++;
+ }
+ assertEquals(4, numRows);
+ assertEquals(expected, result);
+ } finally {
+ if (res != null) {
+ res.close();
+ }
+ }
+ }
+
+ private ResultSet runNullTableQuery(String tableName, String query, TajoClient client) throws Exception {
+ String [] table = new String[] {tableName};
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT4);
+ schema.addColumn("col2", Type.TEXT);
+ schema.addColumn("col3", Type.FLOAT4);
+ schema.addColumn("col4", Type.BOOLEAN);
+ Schema [] schemas = new Schema[] {schema};
+ String [] data = {
+ "\\N|a|1.0|t",
+ "2|\\N|2.0|f",
+ "3|c|\\N|t",
+ "4|d|4.0|\\N"
+ };
+ KeyValueSet tableOptions = new KeyValueSet();
+ tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+ if (client == null) {
+ return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query);
+ } else {
+ return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query, client);
+ }
+ }
+
+ private void assertResultSetNull(ResultSet res, int numRows, boolean useName, int[] nullIndex) throws SQLException {
+ if (numRows == 0) {
+ if (useName) {
+ assertEquals(0, res.getInt(res.getMetaData().getColumnName(nullIndex[numRows])));
+ } else {
+ assertEquals(0, res.getInt(nullIndex[numRows]));
+ }
+ }
+
+ if (numRows == 1) {
+ if (useName) {
+ assertNull(res.getString(res.getMetaData().getColumnName(nullIndex[numRows])));
+ } else {
+ assertNull(res.getString(nullIndex[numRows]));
+ };
+ }
+
+ if (numRows == 2) {
+ if (useName) {
+ assertEquals(0.0, res.getDouble(res.getMetaData().getColumnName(nullIndex[numRows])), 10);
+ } else {
+ assertEquals(0.0, res.getDouble(nullIndex[numRows]), 10);
+ }
+ }
+
+ if (numRows == 3) {
+ if (useName) {
+ assertEquals(false, res.getBoolean(res.getMetaData().getColumnName(nullIndex[numRows])));
+ } else {
+ assertEquals(false, res.getBoolean(nullIndex[numRows]));
+ }
+ }
+ }
+}