You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/19 01:26:05 UTC

tajo git commit: TAJO-1368: Exceptions during processing nested union queries

Repository: tajo
Updated Branches:
  refs/heads/master a9ae3cab6 -> 725448c52


TAJO-1368: Exceptions during processing nested union queries

Closes #402


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/725448c5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/725448c5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/725448c5

Branch: refs/heads/master
Commit: 725448c5249cd8691ea167f595237ed7bcc22293
Parents: a9ae3ca
Author: Jihun Kang <ji...@apache.org>
Authored: Thu Mar 19 09:24:55 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Thu Mar 19 09:24:55 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../engine/planner/global/GlobalPlanner.java    | 39 +++++++++---
 .../java/org/apache/tajo/querymaster/Stage.java |  3 +-
 .../apache/tajo/engine/query/TestCTASQuery.java | 28 +++++++++
 .../tajo/engine/query/TestUnionQuery.java       | 64 +++++++++++++++++++-
 .../TestCTASQuery/CtasWithMultipleUnions.sql    | 12 ++++
 .../testCtasWithMultipleUnions.sql              |  1 +
 7 files changed, 140 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4875cab..56d77b3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -32,6 +32,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1368: Exceptions during processing nested union queries.
+    (jihun)
+
     TAJO-1405: Fix some illegal way of usages on connection pool. 
     (Contributed by navis, Committed by Keuntae Park)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 15d8034..d2ac6cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -821,7 +822,7 @@ public class GlobalPlanner {
 
   public static boolean hasUnionChild(UnaryNode node) {
 
-    // there are two cases:
+    // there are three cases:
     //
     // The first case is:
     //
@@ -835,9 +836,15 @@ public class GlobalPlanner {
     // select avg(..) from (select ... UNION select ) T
     //
     // We can generalize this case as 'a shuffle required operator on the top of union'.
+    //
+    // The third case is:
+    //
+    // create table select * from ( select ... ) a union all select * from ( select ... ) b
 
-    if (node.getChild() instanceof UnaryNode) { // first case
-      UnaryNode child = node.getChild();
+    LogicalNode childNode = node.getChild();
+
+    if (childNode instanceof UnaryNode) { // first case
+      UnaryNode child = (UnaryNode) childNode;
 
       if (child.getChild().getType() == NodeType.PROJECTION) {
         child = child.getChild();
@@ -848,9 +855,11 @@ public class GlobalPlanner {
         return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
       }
 
-    } else if (node.getChild().getType() == NodeType.TABLE_SUBQUERY) { // second case
+    } else if (childNode.getType() == NodeType.TABLE_SUBQUERY) { // second case
       TableSubQueryNode tableSubQuery = node.getChild();
       return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
+    } else if (childNode.getType() == NodeType.UNION) { // third case
+      return true;
     }
 
     return false;
@@ -1156,6 +1165,9 @@ public class GlobalPlanner {
           ((TableSubQueryNode)child).getSubQuery().getType() == NodeType.UNION) {
         MasterPlan masterPlan = context.plan;
         for (DataChannel dataChannel : masterPlan.getIncomingChannels(execBlock.getId())) {
+          // This data channel will be stored in staging directory, but RawFile, default file type, does not support
+          // distributed file system. It needs to change the file format for distributed file system.
+          dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
           ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
 
           ProjectionNode copy = PlannerUtil.clone(plan, node);
@@ -1371,18 +1383,28 @@ public class GlobalPlanner {
       LogicalPlan.QueryBlock rightQueryBlock = plan.getBlock(node.getRightChild());
       LogicalNode rightChild = visit(context, plan, rightQueryBlock, rightQueryBlock.getRoot(), stack);
       stack.pop();
+      
+      MasterPlan masterPlan = context.getPlan();
 
       List<ExecutionBlock> unionBlocks = Lists.newArrayList();
       List<ExecutionBlock> queryBlockBlocks = Lists.newArrayList();
 
       ExecutionBlock leftBlock = context.execBlockMap.remove(leftChild.getPID());
       ExecutionBlock rightBlock = context.execBlockMap.remove(rightChild.getPID());
-      if (leftChild.getType() == NodeType.UNION) {
+
+      // These union types need to eliminate unnecessary nodes between parent and child node of query tree.
+      boolean leftUnion = (leftChild.getType() == NodeType.UNION) ||
+          ((leftChild.getType() == NodeType.TABLE_SUBQUERY) &&
+          (((TableSubQueryNode)leftChild).getSubQuery().getType() == NodeType.UNION));
+      boolean rightUnion = (rightChild.getType() == NodeType.UNION) ||
+          (rightChild.getType() == NodeType.TABLE_SUBQUERY) &&
+          (((TableSubQueryNode)rightChild).getSubQuery().getType() == NodeType.UNION);
+      if (leftUnion) {
         unionBlocks.add(leftBlock);
       } else {
         queryBlockBlocks.add(leftBlock);
       }
-      if (rightChild.getType() == NodeType.UNION) {
+      if (rightUnion) {
         unionBlocks.add(rightBlock);
       } else {
         queryBlockBlocks.add(rightBlock);
@@ -1396,7 +1418,8 @@ public class GlobalPlanner {
       }
 
       for (ExecutionBlock childBlocks : unionBlocks) {
-        for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlocks)) {
+        for (ExecutionBlock grandChildBlock : masterPlan.getChilds(childBlocks)) {
+          masterPlan.disconnect(grandChildBlock, childBlocks);
           queryBlockBlocks.add(grandChildBlock);
         }
       }
@@ -1404,7 +1427,7 @@ public class GlobalPlanner {
       for (ExecutionBlock childBlocks : queryBlockBlocks) {
         DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_SHUFFLE, 1);
         channel.setStoreType(storeType);
-        context.plan.addConnect(channel);
+        masterPlan.addConnect(channel);
       }
 
       context.execBlockMap.put(node.getPID(), execBlock);

http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 4e1f716..20add9f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -780,7 +780,8 @@ public class Stage implements EventHandler<StageEvent> {
       try {
         // Union operator does not require actual query processing. It is performed logically.
         if (execBlock.hasUnion()) {
-          stage.finalizeStats();
+          // Though union operator does not be processed at all, but it should handle the completion event.
+          stage.complete();
           state = StageState.SUCCEEDED;
         } else {
           ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());

http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
index e93d214..18c9fbc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -182,6 +182,34 @@ public class TestCTASQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testCtasWithMultipleUnions() throws Exception {
+    ResultSet res = executeFile("CtasWithMultipleUnions.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    String actual = resultSetToString(res2);
+    res2.close();
+
+    String expected = "c_custkey,c_nationkey\n" +
+        "-------------------------------\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n";
+
+    assertEquals(expected, actual);
+
+    TableDesc desc = client.getTableDesc(CatalogUtil.normalizeIdentifier(res2.getMetaData().getTableName(1)));
+    assertNotNull(desc);
+  }
+
+  @Test
   public final void testCtasWithStoreType() throws Exception {
     ResultSet res = executeFile("CtasWithStoreType.sql");
     res.close();

http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
index d46d110..03a80d1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
@@ -450,4 +450,66 @@ public class TestUnionQuery extends QueryTestCaseBase {
     assertEquals(expected, resultSetToString(res));
     res.close();
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testTajo1368Case1() throws Exception {
+    ResultSet res = executeString(
+        "select * from " +
+            "   (select c_custkey, c_nationkey from customer where c_nationkey < 0 " +
+            "   union all " +
+            "   select c_custkey, c_nationkey from customer where c_nationkey > 0 " +
+            ") a " +
+            "union all " +
+            "select * from " +
+            "   (select c_custkey, c_nationkey from customer where c_nationkey < 0 " +
+            "   union all " +
+            "   select c_custkey, c_nationkey from customer where c_nationkey > 0 " +
+            ") b ");
+
+    String expected = "c_custkey,c_nationkey\n" +
+        "-------------------------------\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n";
+
+    assertEquals(expected, resultSetToString(res));
+    res.close();
+  }
+
+  @Test
+  public void testTajo1368Case2() throws Exception {
+    ResultSet res = executeString("select * from ( "+
+        "select c_custkey, c_nationkey from ( " +
+        "select c_custkey, c_nationkey from ( " +
+        "select c_custkey, c_nationkey from customer) a " +
+        "union all " +
+        "select c_custkey, c_nationkey from ( " +
+        "select c_custkey, c_nationkey from customer) a " +
+        " ) a " +
+        " ) a ");
+
+    String expected = "c_custkey,c_nationkey\n" +
+        "-------------------------------\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n";
+
+    assertEquals(expected, resultSetToString(res));
+    res.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql
new file mode 100644
index 0000000..7176a2a
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql
@@ -0,0 +1,12 @@
+create table testCtasWithMultipleUnions as
+select * from
+   (select c_custkey, c_nationkey from customer where c_nationkey < 0
+   union all
+   select c_custkey, c_nationkey from customer where c_nationkey > 0
+) a
+union all
+select * from
+   (select c_custkey, c_nationkey from customer where c_nationkey < 0
+   union all
+   select c_custkey, c_nationkey from customer where c_nationkey > 0
+) b;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql
new file mode 100644
index 0000000..71b7034
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql
@@ -0,0 +1 @@
+select * from testCtasWithMultipleUnions;
\ No newline at end of file