You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/19 04:08:10 UTC
[45/47] tajo git commit: TAJO-1577: Add test cases to verify join
plans. (jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4b1b7799/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
new file mode 100644
index 0000000..a8e2a3b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.ResultSet;
+
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestInnerJoinWithSubQuery extends TestJoinQuery {
+
+ public TestInnerJoinWithSubQuery(String joinOption) throws Exception {
+ super(joinOption);
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ TestJoinQuery.setup();
+ }
+
+ @AfterClass
+ public static void classTearDown() throws ServiceException {
+ TestJoinQuery.classTearDown();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public final void testJoinWithMultipleJoinQual2() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public final void testJoinWithMultipleJoinQual3() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public final void testJoinWithMultipleJoinQual4() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ public final void testJoinWithJson2() throws Exception {
+ /*
+ select t.n_nationkey, t.n_name, t.n_regionkey, t.n_comment, ps.ps_availqty, s.s_suppkey
+ from (
+ select n_nationkey, n_name, n_regionkey, n_comment
+ from nation n
+ join region r on (n.n_regionkey = r.r_regionkey)
+ ) t
+ join supplier s on (s.s_nationkey = t.n_nationkey)
+ join partsupp ps on (s.s_suppkey = ps.ps_suppkey)
+ where t.n_name in ('ARGENTINA','ETHIOPIA', 'MOROCCO');
+ */
+ ResultSet res = executeJsonQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public void testComplexJoinCondition5() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public void testComplexJoinCondition6() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public void testComplexJoinCondition7() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public final void testBroadcastSubquery() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @SimpleTest()
+ public final void testBroadcastSubquery2() throws Exception {
+ runSimpleTests();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4b1b7799/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
deleted file mode 100644
index 8387abd..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ /dev/null
@@ -1,850 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.Int4Datum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.jdbc.FetchResultSet;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.worker.TajoWorker;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.File;
-import java.io.OutputStream;
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-import static org.junit.Assert.*;
-
-@Category(IntegrationTest.class)
-public class TestJoinBroadcast extends QueryTestCaseBase {
- public TestJoinBroadcast() throws Exception {
- super(TajoConstants.DEFAULT_DATABASE_NAME);
- testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true");
- testingCluster.setAllTajoDaemonConfValue(
- TajoConf.ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "" + (5 * 1024));
-
- executeDDL("create_lineitem_large_ddl.sql", "lineitem_large");
- executeDDL("create_customer_large_ddl.sql", "customer_large");
- executeDDL("create_orders_large_ddl.sql", "orders_large");
- }
-
- @Test
- public final void testCrossJoin() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testWhereClauseJoin1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testWhereClauseJoin2() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testWhereClauseJoin3() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testWhereClauseJoin4() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testWhereClauseJoin5() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testWhereClauseJoin6() throws Exception {
- ResultSet res = executeQuery();
- System.out.println(resultSetToString(res));
- cleanupQuery(res);
- }
-
- @Test
- public final void testTPCHQ2Join() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoin1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoin2() throws Exception {
- // large, large, small, small
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoin3() throws Exception {
- // large, large, small, large, small, small
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoinWithConstantExpr1() throws Exception {
- // outer join with constant projections
- //
- // select c_custkey, orders.o_orderkey, 'val' as val from customer
- // left outer join orders on c_custkey = o_orderkey;
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoinWithConstantExpr2() throws Exception {
- // outer join with constant projections
- //
- // select c_custkey, o.o_orderkey, 'val' as val from customer left outer join
- // (select * from orders) o on c_custkey = o.o_orderkey
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoinWithConstantExpr3() throws Exception {
- // outer join with constant projections
- //
- // select a.c_custkey, 123::INT8 as const_val, b.min_name from customer a
- // left outer join ( select c_custkey, min(c_name) as min_name from customer group by c_custkey) b
- // on a.c_custkey = b.c_custkey;
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testRightOuterJoin1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testFullOuterJoin1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testJoinCoReferredEvals1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testJoinCoReferredEvalsWithSameExprs1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testJoinCoReferredEvalsWithSameExprs2() throws Exception {
- // including grouping operator
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testCrossJoinAndCaseWhen() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testCrossJoinWithAsterisk1() throws Exception {
- // select region.*, customer.* from region, customer;
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testCrossJoinWithAsterisk2() throws Exception {
- // select region.*, customer.* from customer, region;
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testCrossJoinWithAsterisk3() throws Exception {
- // select * from customer, region
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public void testCrossJoinWithAsterisk4() throws Exception {
- // select length(r_regionkey), *, c_custkey*10 from customer, region
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testInnerJoinWithEmptyTable() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoinWithEmptyTable1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoinWithEmptyTable2() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoinWithEmptyTable3() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testLeftOuterJoinWithEmptyTable4() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testRightOuterJoinWithEmptyTable1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testFullOuterJoinWithEmptyTable1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testCrossJoinWithEmptyTable1() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testJoinOnMultipleDatabases() throws Exception {
- executeString("CREATE DATABASE JOINS");
- assertDatabaseExists("joins");
- executeString("CREATE TABLE JOINS.part_ as SELECT * FROM part");
- assertTableExists("joins.part_");
- executeString("CREATE TABLE JOINS.supplier_ as SELECT * FROM supplier");
- assertTableExists("joins.supplier_");
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- executeString("DROP TABLE JOINS.part_ PURGE");
- executeString("DROP TABLE JOINS.supplier_ PURGE");
- executeString("DROP DATABASE JOINS");
- }
-
- private MasterPlan getQueryPlan(QueryId queryId) {
- for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) {
- QueryMasterTask queryMasterTask = eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true);
- if (queryMasterTask != null) {
- return queryMasterTask.getQuery().getPlan();
- }
- }
-
- fail("Can't find query from workers" + queryId);
- return null;
- }
-
- @Test
- public final void testBroadcastBasicJoin() throws Exception {
- ResultSet res = executeQuery();
- assertEquals(FetchResultSet.class, res.getClass());
- FetchResultSet resultSet = (FetchResultSet)res;
- assertResultSet(res);
- cleanupQuery(res);
-
- MasterPlan plan = getQueryPlan(resultSet.getQueryId());
- ExecutionBlock rootEB = plan.getRoot();
-
- /*
- |-eb_1395998037360_0001_000006
- |-eb_1395998037360_0001_000005
- */
- assertEquals(1, plan.getChildCount(rootEB.getId()));
-
- ExecutionBlock firstEB = plan.getChild(rootEB.getId(), 0);
-
- assertNotNull(firstEB);
- assertEquals(2, firstEB.getBroadcastTables().size());
- assertTrue(firstEB.getBroadcastTables().contains("default.supplier"));
- assertTrue(firstEB.getBroadcastTables().contains("default.part"));
- }
-
- @Test
- public final void testBroadcastTwoPartJoin() throws Exception {
- ResultSet res = executeQuery();
- assertEquals(FetchResultSet.class, res.getClass());
- FetchResultSet resultSet = (FetchResultSet)res;
-
- assertResultSet(res);
- cleanupQuery(res);
-
- MasterPlan plan = getQueryPlan(resultSet.getQueryId());
- ExecutionBlock rootEB = plan.getRoot();
-
- /*
- |-eb_1395996354406_0001_000010
- |-eb_1395996354406_0001_000009
- |-eb_1395996354406_0001_000008
- |-eb_1395996354406_0001_000005
- */
- assertEquals(1, plan.getChildCount(rootEB.getId()));
-
- ExecutionBlock firstJoinEB = plan.getChild(rootEB.getId(), 0);
- assertNotNull(firstJoinEB);
- assertEquals(NodeType.JOIN, firstJoinEB.getPlan().getType());
- assertEquals(0, firstJoinEB.getBroadcastTables().size());
-
- ExecutionBlock leafEB1 = plan.getChild(firstJoinEB.getId(), 0);
- assertTrue(leafEB1.getBroadcastTables().contains("default.orders"));
- assertTrue(leafEB1.getBroadcastTables().contains("default.part"));
-
- ExecutionBlock leafEB2 = plan.getChild(firstJoinEB.getId(), 1);
- assertTrue(leafEB2.getBroadcastTables().contains("default.nation"));
- }
-
- @Test
- public final void testBroadcastSubquery() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testBroadcastSubquery2() throws Exception {
- ResultSet res = executeQuery();
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testBroadcastPartitionTable() throws Exception {
- // If all tables participate in the BROADCAST JOIN, there is some missing data.
- executeDDL("customer_partition_ddl.sql", null);
- ResultSet res = executeFile("insert_into_customer_partition.sql");
- res.close();
-
- createMultiFile("nation", 2, new TupleCreator() {
- public Tuple createTuple(String[] columnDatas) {
- return new VTuple(new Datum[]{
- new Int4Datum(Integer.parseInt(columnDatas[0])),
- new TextDatum(columnDatas[1]),
- new Int4Datum(Integer.parseInt(columnDatas[2])),
- new TextDatum(columnDatas[3])
- });
- }
- });
-
- createMultiFile("orders", 1, new TupleCreator() {
- public Tuple createTuple(String[] columnDatas) {
- return new VTuple(new Datum[]{
- new Int4Datum(Integer.parseInt(columnDatas[0])),
- new Int4Datum(Integer.parseInt(columnDatas[1])),
- new TextDatum(columnDatas[2])
- });
- }
- });
-
- res = executeQuery();
- assertResultSet(res);
- res.close();
-
- executeString("DROP TABLE customer_broad_parts PURGE");
- executeString("DROP TABLE nation_multifile PURGE");
- executeString("DROP TABLE orders_multifile PURGE");
- }
-
- @Test
- public final void testBroadcastMultiColumnPartitionTable() throws Exception {
- String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable");
- ResultSet res = testBase.execute(
- "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) ");
- res.close();
- TajoTestingCluster cluster = testBase.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
- res = executeString("insert overwrite into " + tableName
- + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders");
- res.close();
-
- res = executeString(
- "select distinct a.col3 from " + tableName + " as a " +
- "left outer join lineitem_large b " +
- "on a.col1 = b.l_orderkey order by a.col3"
- );
-
- assertResultSet(res);
- cleanupQuery(res);
- }
-
- @Test
- public final void testCasebyCase1() throws Exception {
- // Left outer join with a small table and a large partition table which not matched any partition path.
- String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable");
- testBase.execute(
- "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" +
- "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" +
- "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" +
- "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" +
- "partition by column(l_orderkey int4) ").close();
- TajoTestingCluster cluster = testBase.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
- executeString("insert overwrite into " + tableName +
- " select l_partkey, l_suppkey, l_linenumber, \n" +
- " l_quantity, l_extendedprice, l_discount, l_tax, \n" +
- " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" +
- " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem_large");
-
- ResultSet res = executeString(
- "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " +
- "left outer join " + tableName + " b " +
- "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000"
- );
-
- String expected = "key1,key2\n" +
- "-------------------------------\n" +
- "1,null\n" +
- "1,null\n" +
- "2,null\n" +
- "3,null\n" +
- "3,null\n";
-
- try {
- assertEquals(expected, resultSetToString(res));
- } finally {
- cleanupQuery(res);
- }
- }
-
- @Test
- public final void testInnerAndOuterWithEmpty() throws Exception {
- executeDDL("customer_partition_ddl.sql", null);
- executeFile("insert_into_customer_partition.sql").close();
-
- // outer join table is empty
- ResultSet res = executeString(
- "select a.l_orderkey, b.o_orderkey, c.c_custkey from lineitem a " +
- "inner join orders b on a.l_orderkey = b.o_orderkey " +
- "left outer join customer_broad_parts c on a.l_orderkey = c.c_custkey and c.c_custkey < 0"
- );
-
- String expected = "l_orderkey,o_orderkey,c_custkey\n" +
- "-------------------------------\n" +
- "1,1,null\n" +
- "1,1,null\n" +
- "2,2,null\n" +
- "3,3,null\n" +
- "3,3,null\n";
-
- assertEquals(expected, resultSetToString(res));
- res.close();
-
- executeString("DROP TABLE customer_broad_parts PURGE").close();
- }
-
- static interface TupleCreator {
- public Tuple createTuple(String[] columnDatas);
- }
-
- private void createMultiFile(String tableName, int numRowsEachFile, TupleCreator tupleCreator) throws Exception {
- // make multiple small file
- String multiTableName = tableName + "_multifile";
- executeDDL(multiTableName + "_ddl.sql", null);
-
- TableDesc table = client.getTableDesc(multiTableName);
- assertNotNull(table);
-
- TableMeta tableMeta = table.getMeta();
- Schema schema = table.getLogicalSchema();
-
- File file = new File("src/test/tpch/" + tableName + ".tbl");
-
- if (!file.exists()) {
- file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + tableName + ".tbl");
- }
- String[] rows = FileUtil.readTextFile(file).split("\n");
-
- assertTrue(rows.length > 0);
-
- int fileIndex = 0;
-
- Appender appender = null;
- for (int i = 0; i < rows.length; i++) {
- if (i % numRowsEachFile == 0) {
- if (appender != null) {
- appender.flush();
- appender.close();
- }
- Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
- fileIndex++;
- appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
- .getAppender(tableMeta, schema, dataPath);
- appender.init();
- }
- String[] columnDatas = rows[i].split("\\|");
- Tuple tuple = tupleCreator.createTuple(columnDatas);
- appender.addTuple(tuple);
- }
- appender.flush();
- appender.close();
- }
-
- @Test
- public final void testLeftOuterJoinLeftSideSmallTable() throws Exception {
- KeyValueSet tableOptions = new KeyValueSet();
- tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
- tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("name", Type.TEXT);
- String[] data = new String[]{ "1000000|a", "1000001|b", "2|c", "3|d", "4|e" };
- TajoTestingCluster.createTable("table1", schema, tableOptions, data, 1);
-
- data = new String[10000];
- for (int i = 0; i < data.length; i++) {
- data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable" + i;
- }
- TajoTestingCluster.createTable("table_large", schema, tableOptions, data, 2);
-
- try {
- ResultSet res = executeString(
- "select a.id, b.name from table1 a left outer join table_large b on a.id = b.id order by a.id"
- );
-
- String expected = "id,name\n" +
- "-------------------------------\n" +
- "2,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable2\n" +
- "3,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable3\n" +
- "4,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable4\n" +
- "1000000,null\n" +
- "1000001,null\n";
-
- assertEquals(expected, resultSetToString(res));
-
- cleanupQuery(res);
- } finally {
- executeString("DROP TABLE table1 PURGE").close();
- executeString("DROP TABLE table_large PURGE").close();
- }
- }
-
-
- @Test
- public final void testSelfJoin() throws Exception {
- String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
- ResultSet res = executeString(
- "create table " + tableName + " (n_name text,"
- + " n_comment text, n_regionkey int8) USING csv "
- + "WITH ('csvfile.delimiter'='|')"
- + "PARTITION BY column(n_nationkey int8)");
- res.close();
- assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
- res = executeString(
- "insert overwrite into " + tableName
- + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
- res.close();
-
- res = executeString(
- "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
- + " where a.n_nationkey in (1)");
- String expected = resultSetToString(res);
- res.close();
-
- res = executeString(
- "select a.n_nationkey, a.n_name from " + tableName + " a join "+tableName +
- " b on a.n_nationkey = b.n_nationkey "
- + " where a.n_nationkey in (1)");
- String resultSetData = resultSetToString(res);
- res.close();
-
- assertEquals(expected, resultSetData);
-
- }
-
- @Test
- public final void testSelfJoin2() throws Exception {
- /*
- https://issues.apache.org/jira/browse/TAJO-1102
- See the following case.
- CREATE TABLE orders_partition
- (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,
- o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')
- PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT);
-
- select a.o_orderstatus, count(*) as cnt
- from orders_partition a
- inner join orders_partition b
- on a.o_orderdate = b.o_orderdate
- and a.o_orderstatus = b.o_orderstatus
- and a.o_orderkey = b.o_orderkey
- where a.o_orderdate='1995-02-21'
- and a.o_orderstatus in ('F')
- group by a.o_orderstatus;
-
- Because of the where condition[where a.o_orderdate='1995-02-21 and a.o_orderstatus in ('F')],
- orders_partition table aliased a is small and broadcast target.
- */
- String tableName = CatalogUtil.normalizeIdentifier("partitioned_orders_large");
- ResultSet res = executeString(
- "create table " + tableName + " (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,\n" +
- "o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')\n" +
- "PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT, o_orderkey_mod INT8)");
- res.close();
- assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
- res = executeString(
- "insert overwrite into " + tableName +
- " select o_orderkey, o_custkey, o_totalprice, " +
- " o_orderpriority, o_clerk, o_shippriority, o_comment, o_orderdate, o_orderstatus, o_orderkey % 10 " +
- " from orders_large ");
- res.close();
-
- res = executeString(
- "select a.o_orderdate, a.o_orderstatus, a.o_orderkey % 10 as o_orderkey_mod, a.o_totalprice " +
- "from orders_large a " +
- "join orders_large b on a.o_orderkey = b.o_orderkey " +
- "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey % 10 = 1" +
- " order by a.o_orderkey"
- );
- String expected = resultSetToString(res);
- res.close();
-
- res = executeString(
- "select a.o_orderdate, a.o_orderstatus, a.o_orderkey_mod, a.o_totalprice " +
- "from " + tableName +
- " a join "+ tableName + " b on a.o_orderkey = b.o_orderkey " +
- "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey_mod = 1 " +
- " order by a.o_orderkey"
- );
- String resultSetData = resultSetToString(res);
- res.close();
-
- assertEquals(expected, resultSetData);
-
- }
- @Test
- public void testMultipleBroadcastDataFileWithZeroLength() throws Exception {
- // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner.
- // testMultipleBroadcastDataFileWithZeroLength testcase is for the leaf node
- createMultiFile("nation", 2, new TupleCreator() {
- public Tuple createTuple(String[] columnDatas) {
- return new VTuple(new Datum[]{
- new Int4Datum(Integer.parseInt(columnDatas[0])),
- new TextDatum(columnDatas[1]),
- new Int4Datum(Integer.parseInt(columnDatas[2])),
- new TextDatum(columnDatas[3])
- });
- }
- });
- addEmptyDataFile("nation_multifile", false);
-
- ResultSet res = executeQuery();
-
- assertResultSet(res);
- cleanupQuery(res);
-
- executeString("DROP TABLE nation_multifile PURGE");
- }
-
- @Test
- public void testMultipleBroadcastDataFileWithZeroLength2() throws Exception {
- // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner.
- // testMultipleBroadcastDataFileWithZeroLength2 testcase is for the non-leaf node
- createMultiFile("nation", 2, new TupleCreator() {
- public Tuple createTuple(String[] columnDatas) {
- return new VTuple(new Datum[]{
- new Int4Datum(Integer.parseInt(columnDatas[0])),
- new TextDatum(columnDatas[1]),
- new Int4Datum(Integer.parseInt(columnDatas[2])),
- new TextDatum(columnDatas[3])
- });
- }
- });
- addEmptyDataFile("nation_multifile", false);
-
- ResultSet res = executeQuery();
-
- assertResultSet(res);
- cleanupQuery(res);
-
- executeString("DROP TABLE nation_multifile PURGE");
- }
-
- @Test
- public void testMultiplePartitionedBroadcastDataFileWithZeroLength() throws Exception {
- String tableName = CatalogUtil.normalizeIdentifier("nation_partitioned");
- ResultSet res = testBase.execute(
- "create table " + tableName + " (n_name text) partition by column(n_nationkey int4, n_regionkey int4) ");
- res.close();
- TajoTestingCluster cluster = testBase.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
- res = executeString("insert overwrite into " + tableName
- + " select n_name, n_nationkey, n_regionkey from nation");
- res.close();
-
- addEmptyDataFile("nation_partitioned", true);
-
- res = executeQuery();
-
- assertResultSet(res);
- cleanupQuery(res);
-
- executeString("DROP TABLE nation_partitioned PURGE");
- }
-
- @Test
- public void testMultiplePartitionedBroadcastDataFileWithZeroLength2() throws Exception {
- String tableName = CatalogUtil.normalizeIdentifier("nation_partitioned");
- ResultSet res = testBase.execute(
- "create table " + tableName + " (n_name text) partition by column(n_nationkey int4, n_regionkey int4) ");
- res.close();
- TajoTestingCluster cluster = testBase.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-
- res = executeString("insert overwrite into " + tableName
- + " select n_name, n_nationkey, n_regionkey from nation");
- res.close();
-
- addEmptyDataFile("nation_partitioned", true);
-
- res = executeQuery();
-
- assertResultSet(res);
- cleanupQuery(res);
-
- executeString("DROP TABLE nation_partitioned PURGE");
- }
-
- private void addEmptyDataFile(String tableName, boolean isPartitioned) throws Exception {
- TableDesc table = client.getTableDesc(tableName);
-
- Path path = new Path(table.getPath());
- FileSystem fs = path.getFileSystem(conf);
- if (isPartitioned) {
- List<Path> partitionPathList = getPartitionPathList(fs, path);
- for (Path eachPath: partitionPathList) {
- Path dataPath = new Path(eachPath, 0 + "_empty.csv");
- OutputStream out = fs.create(dataPath);
- out.close();
- }
- } else {
- Path dataPath = new Path(path, 0 + "_empty.csv");
- OutputStream out = fs.create(dataPath);
- out.close();
- }
- }
-
- private List<Path> getPartitionPathList(FileSystem fs, Path path) throws Exception {
- FileStatus[] files = fs.listStatus(path);
- List<Path> paths = new ArrayList<Path>();
- if (files != null) {
- for (FileStatus eachFile: files) {
- if (eachFile.isFile()) {
- paths.add(path);
- return paths;
- } else {
- paths.addAll(getPartitionPathList(fs, eachFile.getPath()));
- }
- }
- }
-
- return paths;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4b1b7799/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
index 34ead13..da0f59d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -18,89 +18,322 @@
package org.apache.tajo.engine.query;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.sql.ResultSet;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-public class TestJoinOnPartitionedTables extends QueryTestCaseBase {
+/*
+ * NOTE: Plan tests are disabled in TestJoinOnPartitionedTables.
+ * A plan reading partitioned table currently contains HDFS paths to input partitions.
+ * An example form of path to an input partition is hdfs://localhost:60305/tajo/warehouse/default/customer_parts/c_nationkey=1.
+ * Here, the different HDFS port is used for each test run, it is difficult to test query plans that read partitioned table.
+ */
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestJoinOnPartitionedTables extends TestJoinQuery {
+
+ public TestJoinOnPartitionedTables(String joinOption) throws Exception {
+ super(joinOption);
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ TestJoinQuery.setup();
+ client.executeQuery("CREATE TABLE if not exists customer_parts " +
+ "(c_custkey INT4, c_name TEXT, c_address TEXT, c_phone TEXT, c_acctbal FLOAT8, c_mktsegment TEXT, c_comment TEXT) " +
+ "PARTITION BY COLUMN (c_nationkey INT4) as " +
+ "SELECT c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey FROM customer;");
+ client.executeQueryAndGetResult("create table if not exists nation_partitioned (n_name text) " +
+ "partition by column(n_nationkey int4, n_regionkey int4) " +
+ "as select n_name, n_nationkey, n_regionkey from nation");
+ addEmptyDataFile("nation_partitioned", true);
+ }
- public TestJoinOnPartitionedTables() {
- super(TajoConstants.DEFAULT_DATABASE_NAME);
+ @AfterClass
+ public static void classTearDown() throws ServiceException {
+ TestJoinQuery.classTearDown();
+ client.executeQuery("DROP TABLE IF EXISTS customer_parts PURGE");
+ client.executeQuery("DROP TABLE IF EXISTS nation_partitioned PURGE");
}
@Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
public void testPartitionTableJoinSmallTable() throws Exception {
- executeDDL("customer_ddl.sql", null);
- ResultSet res = executeFile("insert_into_customer.sql");
- res.close();
+ runSimpleTests();
+ }
- res = executeQuery();
- assertResultSet(res);
- res.close();
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testNoProjectionJoinQual() throws Exception {
+ runSimpleTests();
+ }
- res = executeFile("selfJoinOfPartitionedTable.sql");
- assertResultSet(res, "selfJoinOfPartitionedTable.result");
- res.close();
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testPartialFilterPushDown() throws Exception {
+ runSimpleTests();
+ }
- res = executeFile("testNoProjectionJoinQual.sql");
- assertResultSet(res, "testNoProjectionJoinQual.result");
- res.close();
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testPartialFilterPushDownOuterJoin() throws Exception {
+ runSimpleTests();
+ }
- res = executeFile("testPartialFilterPushDown.sql");
- assertResultSet(res, "testPartialFilterPushDown.result");
- res.close();
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testPartialFilterPushDownOuterJoin2() throws Exception {
+ runSimpleTests();
+ }
- res = executeFile("testPartialFilterPushDownOuterJoin.sql");
- assertResultSet(res, "testPartialFilterPushDownOuterJoin.result");
- res.close();
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void selfJoinOfPartitionedTable() throws Exception {
+ runSimpleTests();
+ }
- res = executeFile("testPartialFilterPushDownOuterJoin2.sql");
- assertResultSet(res, "testPartialFilterPushDownOuterJoin2.result");
- res.close();
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest(queries = {
+ @QuerySpec("select a.c_custkey, b.c_custkey from " +
+ " (select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+ " union all " +
+ " select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
+ ") a " +
+ "left outer join customer_parts b " +
+ "on a.c_custkey = b.c_custkey " +
+ "and a.c_nationkey > 0")
+ })
+ public void testPartitionMultiplePartitionFilter() throws Exception {
+ runSimpleTests();
+ }
- executeString("DROP TABLE customer_parts PURGE").close();
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public void testFilterPushDownPartitionColumnCaseWhen() throws Exception {
+ runSimpleTests();
}
@Test
- public void testPartitionMultiplePartitionFilter() throws Exception {
- executeDDL("customer_ddl.sql", null);
- ResultSet res = executeFile("insert_into_customer.sql");
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+ @SimpleTest()
+ public void testMultiplePartitionedBroadcastDataFileWithZeroLength() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+ @SimpleTest()
+ public void testMultiplePartitionedBroadcastDataFileWithZeroLength2() throws Exception {
+ runSimpleTests();
+ }
+
+ @Test
+ public final void testCasebyCase1() throws Exception {
+ // Left outer join with a small table and a large partition table which not matched any partition path.
+ String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable");
+ executeString(
+ "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" +
+ "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" +
+ "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" +
+ "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" +
+ "partition by column(l_orderkey int4) ").close();
+
+ try {
+ executeString("insert overwrite into " + tableName +
+ " select l_partkey, l_suppkey, l_linenumber, \n" +
+ " l_quantity, l_extendedprice, l_discount, l_tax, \n" +
+ " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" +
+ " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem");
+
+ ResultSet res = executeString(
+ "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " +
+ "left outer join " + tableName + " b " +
+ "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000"
+ );
+
+ String expected = "key1,key2\n" +
+ "-------------------------------\n" +
+ "1,null\n" +
+ "1,null\n" +
+ "2,null\n" +
+ "3,null\n" +
+ "3,null\n";
+ assertEquals(expected, resultSetToString(res));
+ cleanupQuery(res);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
+ }
+
+ // TODO: This test should be reverted after resolving TAJO-1600
+// @Test
+ public final void testBroadcastMultiColumnPartitionTable() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable");
+ ResultSet res = testBase.execute(
+ "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) ");
res.close();
+ TajoTestingCluster cluster = testBase.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+ try {
+ res = executeString("insert overwrite into " + tableName
+ + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders");
+ res.close();
+
+ res = executeString(
+ "select distinct a.col3 from " + tableName + " as a " +
+ "left outer join lineitem b " +
+ "on a.col1 = b.l_orderkey order by a.col3"
+ );
+
+ assertResultSet(res);
+ cleanupQuery(res);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
+ }
- res = executeString(
- "select a.c_custkey, b.c_custkey from " +
- " (select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
- " union all " +
- " select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " +
- ") a " +
- "left outer join customer_parts b " +
- "on a.c_custkey = b.c_custkey " +
- "and a.c_nationkey > 0"
- );
-
- String expected =
- "c_custkey,c_custkey\n" +
- "-------------------------------\n";
- assertEquals(expected, resultSetToString(res));
+ @Test
+ public final void testSelfJoin() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
+ ResultSet res = executeString(
+ "create table " + tableName + " (n_name text,"
+ + " n_comment text, n_regionkey int8) USING csv "
+ + "WITH ('csvfile.delimiter'='|')"
+ + "PARTITION BY column(n_nationkey int8)");
res.close();
+ assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+ try {
+ res = executeString(
+ "insert overwrite into " + tableName
+ + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
+ res.close();
+
+ res = executeString(
+ "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
+ + " where a.n_nationkey in (1)");
+ String expected = resultSetToString(res);
+ res.close();
- executeString("DROP TABLE customer_parts PURGE").close();
+ res = executeString(
+ "select a.n_nationkey, a.n_name from " + tableName + " a join " + tableName +
+ " b on a.n_nationkey = b.n_nationkey "
+ + " where a.n_nationkey in (1)");
+ String resultSetData = resultSetToString(res);
+ res.close();
+
+ assertEquals(expected, resultSetData);
+ cleanupQuery(res);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
}
@Test
- public void testFilterPushDownPartitionColumnCaseWhen() throws Exception {
- executeDDL("customer_ddl.sql", null);
- ResultSet res = executeFile("insert_into_customer.sql");
+ public final void testSelfJoin2() throws Exception {
+ /*
+ https://issues.apache.org/jira/browse/TAJO-1102
+ See the following case.
+ CREATE TABLE orders_partition
+ (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,
+ o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')
+ PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT);
+
+ select a.o_orderstatus, count(*) as cnt
+ from orders_partition a
+ inner join orders_partition b
+ on a.o_orderdate = b.o_orderdate
+ and a.o_orderstatus = b.o_orderstatus
+ and a.o_orderkey = b.o_orderkey
+ where a.o_orderdate='1995-02-21'
+ and a.o_orderstatus in ('F')
+ group by a.o_orderstatus;
+
+ Because of the where condition[where a.o_orderdate='1995-02-21 and a.o_orderstatus in ('F')],
+ orders_partition table aliased a is small and broadcast target.
+ */
+ String tableName = CatalogUtil.normalizeIdentifier("partitioned_orders");
+ ResultSet res = executeString(
+ "create table " + tableName + " (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,\n" +
+ "o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING CSV WITH ('csvfile.delimiter'='|')\n" +
+ "PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT, o_orderkey_mod INT8)");
res.close();
+ assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+ try {
+ res = executeString(
+ "insert overwrite into " + tableName +
+ " select o_orderkey, o_custkey, o_totalprice, " +
+ " o_orderpriority, o_clerk, o_shippriority, o_comment, o_orderdate, o_orderstatus, o_orderkey % 10 " +
+ " from orders ");
+ res.close();
+
+ res = executeString(
+ "select a.o_orderdate, a.o_orderstatus, a.o_orderkey % 10 as o_orderkey_mod, a.o_totalprice " +
+ "from orders a " +
+ "join orders b on a.o_orderkey = b.o_orderkey " +
+ "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey % 10 = 1" +
+ " order by a.o_orderkey"
+ );
+ String expected = resultSetToString(res);
+ res.close();
+
+ res = executeString(
+ "select a.o_orderdate, a.o_orderstatus, a.o_orderkey_mod, a.o_totalprice " +
+ "from " + tableName +
+ " a join " + tableName + " b on a.o_orderkey = b.o_orderkey " +
+ "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey_mod = 1 " +
+ " order by a.o_orderkey"
+ );
+ String resultSetData = resultSetToString(res);
+ res.close();
- res = executeQuery();
- assertResultSet(res);
+ cleanupQuery(res);
+ assertEquals(expected, resultSetData);
+ } finally {
+ executeString("drop table " + tableName + " purge");
+ }
+ }
+
+ @Test
+ @Option(withExplain = false, withExplainGlobal = false, parameterized = true)
+ @SimpleTest()
+ public final void testBroadcastPartitionTable() throws Exception {
+ // If all tables participate in the BROADCAST JOIN, there is some missing data.
+ executeDDL("customer_partition_ddl.sql", null);
+ ResultSet res = executeFile("insert_into_customer_partition.sql");
res.close();
- executeString("DROP TABLE customer_parts PURGE").close();
+ try {
+ runSimpleTests();
+ } finally {
+ executeString("DROP TABLE customer_broad_parts PURGE");
+ }
}
}