You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/04/11 01:31:21 UTC

drill git commit: DRILL-2612: Union-All can work even when the right input side comes from an empty data source

Repository: drill
Updated Branches:
  refs/heads/master 38eb383a9 -> 093626e33


DRILL-2612: Union-All can work even when the right input side comes from an empty data source


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/093626e3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/093626e3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/093626e3

Branch: refs/heads/master
Commit: 093626e33671510eb3952a09f6fefcb33bc90f18
Parents: 38eb383
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Mon Mar 30 10:43:49 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Apr 10 16:28:20 2015 -0700

----------------------------------------------------------------------
 .../impl/union/UnionAllRecordBatch.java         | 79 +++++++++++++++++---
 .../java/org/apache/drill/TestUnionAll.java     | 21 ++++++
 2 files changed, 90 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/093626e3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index b472890..52b1794 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -283,6 +283,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
     private OneSideInput rightSide;
     private IterOutcome upstream = IterOutcome.NOT_YET;
     private boolean leftIsFinish = false;
+    private boolean rightIsFinish = false;
 
     // These two schemas are obtained from the first record batches of the left and right inputs
     // They are used to check if the schema is changed between recordbatches
@@ -298,31 +299,63 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
     public IterOutcome nextBatch() throws SchemaChangeException {
       if(upstream == RecordBatch.IterOutcome.NOT_YET) {
         IterOutcome iterLeft = leftSide.nextBatch();
-        if(iterLeft != IterOutcome.OK_NEW_SCHEMA) {
-          upstream = iterLeft;
-          return upstream;
+        switch(iterLeft) {
+          case OK_NEW_SCHEMA:
+            break;
+
+          case STOP:
+          case OUT_OF_MEMORY:
+            return iterLeft;
+
+          case NONE:
+            throw new SchemaChangeException("The left input of Union-All should not come from an empty data source");
+
+          default:
+            throw new IllegalStateException(String.format("Unknown state %s.", iterLeft));
         }
 
         IterOutcome iterRight = rightSide.nextBatch();
-        if(iterRight != IterOutcome.OK_NEW_SCHEMA) {
-          upstream = iterRight;
-          return upstream;
+        switch(iterRight) {
+          case OK_NEW_SCHEMA:
+            // Unless there is no record batch on the left side of the inputs,
+            // always start processing from the left side
+            unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
+            inferOutputFields();
+            break;
+
+          case NONE:
+            // If the right input side comes from an empty data source,
+            // use the left input side's schema directly
+            unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
+            inferOutputFieldsFromLeftSide();
+            rightIsFinish = true;
+            break;
+
+          case STOP:
+          case OUT_OF_MEMORY:
+            return iterRight;
+
+          default:
+            throw new IllegalStateException(String.format("Unknown state %s.", iterRight));
         }
 
-        unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-        inferOutputFields();
         upstream = IterOutcome.OK_NEW_SCHEMA;
         return upstream;
       } else {
         unionAllRecordBatch.clearCurrentRecordBatch();
 
-        if(leftIsFinish) {
+        if(leftIsFinish && rightIsFinish) {
+          upstream = IterOutcome.NONE;
+          return upstream;
+        } else if(leftIsFinish) {
           IterOutcome iterOutcome = rightSide.nextBatch();
 
           switch(iterOutcome) {
+            case NONE:
+              rightIsFinish = true;
+              // fall through
             case STOP:
             case OUT_OF_MEMORY:
-            case NONE:
               upstream = iterOutcome;
               return upstream;
 
@@ -340,6 +373,23 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
             default:
               throw new IllegalStateException(String.format("Unknown state %s.", upstream));
           }
+        } else if(rightIsFinish) {
+          IterOutcome iterOutcome = leftSide.nextBatch();
+          switch(iterOutcome) {
+            case STOP:
+            case OUT_OF_MEMORY:
+            case NONE:
+              upstream = iterOutcome;
+              return upstream;
+
+            case OK:
+              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
+              upstream = iterOutcome;
+              return upstream;
+
+            default:
+              throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
+          }
         } else {
           IterOutcome iterOutcome = leftSide.nextBatch();
 
@@ -425,6 +475,15 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
       assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
     }
 
+    private void inferOutputFieldsFromLeftSide() {
+      outputFields = Lists.newArrayList();
+      Iterator<MaterializedField> iter = leftSide.getRecordBatch().getSchema().iterator();
+      while(iter.hasNext()) {
+        MaterializedField field = iter.next();
+        outputFields.add(MaterializedField.create(field.getPath(), field.getType()));
+      }
+    }
+
     public List<MaterializedField> getOutputFields() {
       if(outputFields == null) {
         throw new NullPointerException("Output fields have not been inferred");

http://git-wip-us.apache.org/repos/asf/drill/blob/093626e3/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index d31cd19..bcffe0c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -478,4 +478,25 @@ public class TestUnionAll extends BaseTestQuery{
         .baselineValues((long) 4)
         .build().run();
   }
+
+  @Test // see DRILL-2612
+  public void testUnionAllRightEmptyJson() throws Exception {
+    String rootEmpty = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+    String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+    String queryRightEmpty = String.format(
+        "select key from dfs_test.`%s` " +
+        "union all " +
+        "select key from dfs_test.`%s`",
+        rootSimple,
+        rootEmpty);
+
+    testBuilder()
+      .sqlQuery(queryRightEmpty)
+      .unOrdered()
+      .baselineColumns("key")
+      .baselineValues(true)
+      .baselineValues(false)
+      .build().run();
+  }
 }
\ No newline at end of file