You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/19 01:26:05 UTC
tajo git commit: TAJO-1368: Exceptions during processing nested union
queries
Repository: tajo
Updated Branches:
refs/heads/master a9ae3cab6 -> 725448c52
TAJO-1368: Exceptions during processing nested union queries
Closes #402
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/725448c5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/725448c5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/725448c5
Branch: refs/heads/master
Commit: 725448c5249cd8691ea167f595237ed7bcc22293
Parents: a9ae3ca
Author: Jihun Kang <ji...@apache.org>
Authored: Thu Mar 19 09:24:55 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Thu Mar 19 09:24:55 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../engine/planner/global/GlobalPlanner.java | 39 +++++++++---
.../java/org/apache/tajo/querymaster/Stage.java | 3 +-
.../apache/tajo/engine/query/TestCTASQuery.java | 28 +++++++++
.../tajo/engine/query/TestUnionQuery.java | 64 +++++++++++++++++++-
.../TestCTASQuery/CtasWithMultipleUnions.sql | 12 ++++
.../testCtasWithMultipleUnions.sql | 1 +
7 files changed, 140 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4875cab..56d77b3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -32,6 +32,9 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1368: Exceptions during processing nested union queries.
+ (jihun)
+
TAJO-1405: Fix some illegal way of usages on connection pool.
(Contributed by navis, Committed by Keuntae Park)
http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 15d8034..d2ac6cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -821,7 +822,7 @@ public class GlobalPlanner {
public static boolean hasUnionChild(UnaryNode node) {
- // there are two cases:
+ // there are three cases:
//
// The first case is:
//
@@ -835,9 +836,15 @@ public class GlobalPlanner {
// select avg(..) from (select ... UNION select ) T
//
// We can generalize this case as 'a shuffle required operator on the top of union'.
+ //
+ // The third case is:
+ //
+ // create table select * from ( select ... ) a union all select * from ( select ... ) b
- if (node.getChild() instanceof UnaryNode) { // first case
- UnaryNode child = node.getChild();
+ LogicalNode childNode = node.getChild();
+
+ if (childNode instanceof UnaryNode) { // first case
+ UnaryNode child = (UnaryNode) childNode;
if (child.getChild().getType() == NodeType.PROJECTION) {
child = child.getChild();
@@ -848,9 +855,11 @@ public class GlobalPlanner {
return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
}
- } else if (node.getChild().getType() == NodeType.TABLE_SUBQUERY) { // second case
+ } else if (childNode.getType() == NodeType.TABLE_SUBQUERY) { // second case
TableSubQueryNode tableSubQuery = node.getChild();
return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
+ } else if (childNode.getType() == NodeType.UNION) { // third case
+ return true;
}
return false;
@@ -1156,6 +1165,9 @@ public class GlobalPlanner {
((TableSubQueryNode)child).getSubQuery().getType() == NodeType.UNION) {
MasterPlan masterPlan = context.plan;
for (DataChannel dataChannel : masterPlan.getIncomingChannels(execBlock.getId())) {
+ // This data channel will be stored in staging directory, but RawFile, default file type, does not support
+ // distributed file system. It needs to change the file format for distributed file system.
+ dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
ProjectionNode copy = PlannerUtil.clone(plan, node);
@@ -1371,18 +1383,28 @@ public class GlobalPlanner {
LogicalPlan.QueryBlock rightQueryBlock = plan.getBlock(node.getRightChild());
LogicalNode rightChild = visit(context, plan, rightQueryBlock, rightQueryBlock.getRoot(), stack);
stack.pop();
+
+ MasterPlan masterPlan = context.getPlan();
List<ExecutionBlock> unionBlocks = Lists.newArrayList();
List<ExecutionBlock> queryBlockBlocks = Lists.newArrayList();
ExecutionBlock leftBlock = context.execBlockMap.remove(leftChild.getPID());
ExecutionBlock rightBlock = context.execBlockMap.remove(rightChild.getPID());
- if (leftChild.getType() == NodeType.UNION) {
+
+ // These union types need to eliminate unnecessary nodes between parent and child node of query tree.
+ boolean leftUnion = (leftChild.getType() == NodeType.UNION) ||
+ ((leftChild.getType() == NodeType.TABLE_SUBQUERY) &&
+ (((TableSubQueryNode)leftChild).getSubQuery().getType() == NodeType.UNION));
+ boolean rightUnion = (rightChild.getType() == NodeType.UNION) ||
+ (rightChild.getType() == NodeType.TABLE_SUBQUERY) &&
+ (((TableSubQueryNode)rightChild).getSubQuery().getType() == NodeType.UNION);
+ if (leftUnion) {
unionBlocks.add(leftBlock);
} else {
queryBlockBlocks.add(leftBlock);
}
- if (rightChild.getType() == NodeType.UNION) {
+ if (rightUnion) {
unionBlocks.add(rightBlock);
} else {
queryBlockBlocks.add(rightBlock);
@@ -1396,7 +1418,8 @@ public class GlobalPlanner {
}
for (ExecutionBlock childBlocks : unionBlocks) {
- for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlocks)) {
+ for (ExecutionBlock grandChildBlock : masterPlan.getChilds(childBlocks)) {
+ masterPlan.disconnect(grandChildBlock, childBlocks);
queryBlockBlocks.add(grandChildBlock);
}
}
@@ -1404,7 +1427,7 @@ public class GlobalPlanner {
for (ExecutionBlock childBlocks : queryBlockBlocks) {
DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_SHUFFLE, 1);
channel.setStoreType(storeType);
- context.plan.addConnect(channel);
+ masterPlan.addConnect(channel);
}
context.execBlockMap.put(node.getPID(), execBlock);
http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 4e1f716..20add9f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -780,7 +780,8 @@ public class Stage implements EventHandler<StageEvent> {
try {
// Union operator does not require actual query processing. It is performed logically.
if (execBlock.hasUnion()) {
- stage.finalizeStats();
+ // Though union operator does not be processed at all, but it should handle the completion event.
+ stage.complete();
state = StageState.SUCCEEDED;
} else {
ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
index e93d214..18c9fbc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -182,6 +182,34 @@ public class TestCTASQuery extends QueryTestCaseBase {
}
@Test
+ public final void testCtasWithMultipleUnions() throws Exception {
+ ResultSet res = executeFile("CtasWithMultipleUnions.sql");
+ res.close();
+
+ ResultSet res2 = executeQuery();
+ String actual = resultSetToString(res2);
+ res2.close();
+
+ String expected = "c_custkey,c_nationkey\n" +
+ "-------------------------------\n" +
+ "1,15\n" +
+ "2,13\n" +
+ "3,1\n" +
+ "4,4\n" +
+ "5,3\n" +
+ "1,15\n" +
+ "2,13\n" +
+ "3,1\n" +
+ "4,4\n" +
+ "5,3\n";
+
+ assertEquals(expected, actual);
+
+ TableDesc desc = client.getTableDesc(CatalogUtil.normalizeIdentifier(res2.getMetaData().getTableName(1)));
+ assertNotNull(desc);
+ }
+
+ @Test
public final void testCtasWithStoreType() throws Exception {
ResultSet res = executeFile("CtasWithStoreType.sql");
res.close();
http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
index d46d110..03a80d1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
@@ -450,4 +450,66 @@ public class TestUnionQuery extends QueryTestCaseBase {
assertEquals(expected, resultSetToString(res));
res.close();
}
-}
\ No newline at end of file
+
+ @Test
+ public void testTajo1368Case1() throws Exception {
+ ResultSet res = executeString(
+ "select * from " +
+ " (select c_custkey, c_nationkey from customer where c_nationkey < 0 " +
+ " union all " +
+ " select c_custkey, c_nationkey from customer where c_nationkey > 0 " +
+ ") a " +
+ "union all " +
+ "select * from " +
+ " (select c_custkey, c_nationkey from customer where c_nationkey < 0 " +
+ " union all " +
+ " select c_custkey, c_nationkey from customer where c_nationkey > 0 " +
+ ") b ");
+
+ String expected = "c_custkey,c_nationkey\n" +
+ "-------------------------------\n" +
+ "1,15\n" +
+ "2,13\n" +
+ "3,1\n" +
+ "4,4\n" +
+ "5,3\n" +
+ "1,15\n" +
+ "2,13\n" +
+ "3,1\n" +
+ "4,4\n" +
+ "5,3\n";
+
+ assertEquals(expected, resultSetToString(res));
+ res.close();
+ }
+
+ @Test
+ public void testTajo1368Case2() throws Exception {
+ ResultSet res = executeString("select * from ( "+
+ "select c_custkey, c_nationkey from ( " +
+ "select c_custkey, c_nationkey from ( " +
+ "select c_custkey, c_nationkey from customer) a " +
+ "union all " +
+ "select c_custkey, c_nationkey from ( " +
+ "select c_custkey, c_nationkey from customer) a " +
+ " ) a " +
+ " ) a ");
+
+ String expected = "c_custkey,c_nationkey\n" +
+ "-------------------------------\n" +
+ "1,15\n" +
+ "2,13\n" +
+ "3,1\n" +
+ "4,4\n" +
+ "5,3\n" +
+ "1,15\n" +
+ "2,13\n" +
+ "3,1\n" +
+ "4,4\n" +
+ "5,3\n";
+
+ assertEquals(expected, resultSetToString(res));
+ res.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql
new file mode 100644
index 0000000..7176a2a
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql
@@ -0,0 +1,12 @@
+create table testCtasWithMultipleUnions as
+select * from
+ (select c_custkey, c_nationkey from customer where c_nationkey < 0
+ union all
+ select c_custkey, c_nationkey from customer where c_nationkey > 0
+) a
+union all
+select * from
+ (select c_custkey, c_nationkey from customer where c_nationkey < 0
+ union all
+ select c_custkey, c_nationkey from customer where c_nationkey > 0
+) b;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql
new file mode 100644
index 0000000..71b7034
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql
@@ -0,0 +1 @@
+select * from testCtasWithMultipleUnions;
\ No newline at end of file