You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/04/06 07:10:23 UTC

drill git commit: DRILL-2591: In UnionAllRecordBactch, the mechansim to detect schema change is corrected

Repository: drill
Updated Branches:
  refs/heads/master 862ab91e9 -> a53e12336


DRILL-2591: In UnionAllRecordBactch, the mechansim to detect schema change is corrected


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a53e1233
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a53e1233
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a53e1233

Branch: refs/heads/master
Commit: a53e12336c29b421f1df51da480af9a65d70bb72
Parents: 862ab91
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Fri Mar 27 11:37:07 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sun Apr 5 21:43:41 2015 -0700

----------------------------------------------------------------------
 .../impl/union/UnionAllRecordBatch.java         | 31 ++++++++--
 .../physical/visitor/FinalColumnReorderer.java  | 20 ++++---
 .../java/org/apache/drill/TestUnionAll.java     | 63 +++++++++++++++++++-
 .../src/test/resources/store/json/dateData.json | 12 ++++
 .../test/resources/store/json/timeStmpData.json | 14 +++++
 .../testframework/testUnionAllQueries/q18.tsv   | 15 +++++
 6 files changed, 141 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 806104a..61de3a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -284,6 +284,11 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
     private IterOutcome upstream = IterOutcome.NOT_YET;
     private boolean leftIsFinish = false;
 
+    // These two schemas are obtained from the first record batches of the left and right inputs
+    // They are used to check if the schema is changed between recordbatches
+    private BatchSchema leftSchema;
+    private BatchSchema rightSchema;
+
     public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) {
       this.unionAllRecordBatch = unionAllRecordBatch;
       leftSide = new OneSideInput(left);
@@ -321,13 +326,20 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
               upstream = iterOutcome;
               return upstream;
 
+            case OK_NEW_SCHEMA:
+              if(!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
+                throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
+              }
+
+              upstream = IterOutcome.OK;
+              // fall through
             case OK:
               unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
               upstream = iterOutcome;
               return upstream;
 
             default:
-              throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
+              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
           }
         } else {
           IterOutcome iterOutcome = leftSide.nextBatch();
@@ -338,7 +350,14 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
               upstream = iterOutcome;
               return upstream;
 
-            case OK:
+            case OK_NEW_SCHEMA:
+              if(!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
+                throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
+              }
+
+              upstream = IterOutcome.OK;
+              // fall through
+              case OK:
               unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
               upstream = iterOutcome;
               return upstream;
@@ -350,7 +369,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
               return upstream;
 
             default:
-              throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
+              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
           }
         }
       }
@@ -360,8 +379,10 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
     // where the output type is chosen based on DRILL's implicit casting rules
     private void inferOutputFields() {
       outputFields = Lists.newArrayList();
-      Iterator<MaterializedField> leftIter = leftSide.getRecordBatch().getSchema().iterator();
-      Iterator<MaterializedField> rightIter = rightSide.getRecordBatch().getSchema().iterator();
+      leftSchema = leftSide.getRecordBatch().getSchema();
+      rightSchema = rightSide.getRecordBatch().getSchema();
+      Iterator<MaterializedField> leftIter = leftSchema.iterator();
+      Iterator<MaterializedField> rightIter = rightSchema.iterator();
 
       int index = 1;
       while(leftIter.hasNext() && rightIter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
index 1aa033b..375d69f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
@@ -45,14 +45,10 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
   @Override
   public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
     Prel newChild = ((Prel) prel.getChild()).accept(this, value);
-    return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild)));
+    return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild, true)));
   }
 
   private Prel addTrivialOrderedProjectPrel(Prel prel) {
-    if (!prel.needsFinalColumnReordering()) {
-      return prel;
-    }
-
     RelDataType t = prel.getRowType();
 
     RexBuilder b = prel.getCluster().getRexBuilder();
@@ -64,16 +60,24 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
       return prel;
     }
 
-    for (int i =0; i < projectCount; i++) {
+    for (int i = 0; i < projectCount; i++) {
       projections.add(b.makeInputRef(prel, i));
     }
     return new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, prel.getRowType());
   }
 
+  private Prel addTrivialOrderedProjectPrel(Prel prel, boolean checkNecessity) {
+    if(checkNecessity && !prel.needsFinalColumnReordering()) {
+      return prel;
+    } else {
+      return addTrivialOrderedProjectPrel(prel);
+    }
+  }
+
   @Override
   public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
     Prel newChild = ((Prel) prel.getChild()).accept(this, null);
-    return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild)));
+    return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild, true)));
   }
 
   @Override
@@ -105,7 +109,7 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
 
       boolean needProjectBelowUnion = !(p instanceof ProjectPrel);
       if(needProjectBelowUnion) {
-        child = addTrivialOrderedProjectPrel(child);
+        child = addTrivialOrderedProjectPrel(child, false);
       }
 
       children.add(child);

http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index fcf5c9f..fee1d6a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -25,7 +25,7 @@ import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
 import org.junit.Test;
 
 public class TestUnionAll extends BaseTestQuery{
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
 
   @Test  // Simple Union-All over two scans
   public void testUnionAll1() throws Exception {
@@ -368,4 +368,65 @@ public class TestUnionAll extends BaseTestQuery{
 
     test(query);
   }
+
+  @Test // see DRILL-2591
+  public void testDateAndTimestampJson() throws Exception {
+    String rootDate = FileUtils.getResourceAsFile("/store/json/dateData.json").toURI().toString();
+    String rootTimpStmp = FileUtils.getResourceAsFile("/store/json/timeStmpData.json").toURI().toString();
+
+    String query = String.format(
+        "(select max(key) as key from dfs_test.`%s` " +
+        "union all " +
+        "select key from dfs_test.`%s`)", rootDate, rootTimpStmp);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .csvBaselineFile("testframework/testUnionAllQueries/q18.tsv")
+        .baselineTypes(TypeProtos.MinorType.VARCHAR)
+        .baselineColumns("key")
+        .build().run();
+  }
+
+  @Test // see DRILL-2637
+  public void testUnionAllOneInputContainsAggFunction() throws Exception {
+    String root = FileUtils.getResourceAsFile("/multilevel/csv/1994/Q1/orders_94_q1.csv").toURI().toString();
+    String query1 = String.format("select * from ((select count(c1) as ct from (select columns[0] c1 from dfs.`%s`)) \n" +
+        "union all \n" +
+        "(select columns[0] c2 from dfs.`%s`)) order by ct limit 3", root, root);
+
+    String query2 = String.format("select * from ((select columns[0] ct from dfs.`%s`)\n" +
+        "union all \n" +
+        "(select count(c1) as c2 from (select columns[0] c1 from dfs.`%s`))) order by ct limit 3", root, root);
+
+    String query3 = String.format("select * from ((select count(c1) as ct from (select columns[0] c1 from dfs.`%s`))\n" +
+        "union all \n" +
+        "(select count(c1) as c2 from (select columns[0] c1 from dfs.`%s`))) order by ct", root, root);
+
+    testBuilder()
+        .sqlQuery(query1)
+        .ordered()
+        .baselineColumns("ct")
+        .baselineValues((long) 10)
+        .baselineValues((long) 66)
+        .baselineValues((long) 99)
+        .build().run();
+
+    testBuilder()
+        .sqlQuery(query2)
+        .ordered()
+        .baselineColumns("ct")
+        .baselineValues((long) 10)
+        .baselineValues((long) 66)
+        .baselineValues((long) 99)
+        .build().run();
+
+    testBuilder()
+        .sqlQuery(query3)
+         .ordered()
+         .baselineColumns("ct")
+         .baselineValues((long) 10)
+         .baselineValues((long) 10)
+         .build().run();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/store/json/dateData.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/dateData.json b/exec/java-exec/src/test/resources/store/json/dateData.json
new file mode 100644
index 0000000..d15d9bc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/dateData.json
@@ -0,0 +1,12 @@
+{"key":"2009-03-03"}
+{"key":"2001-08-27"}
+{"key":"2011-07-26"}
+{"key":"1970-09-02"}
+{"key":"1983-04-24"}
+{"key":"2007-02-01"}
+{"key":"1977-08-03"}
+{"key":"1962-05-14"}
+{"key":"1950-02-16"}
+{"key":"1983-09-05"}
+{"key":"2000-09-09"}
+{"key":"1960-08-18"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/store/json/timeStmpData.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/timeStmpData.json b/exec/java-exec/src/test/resources/store/json/timeStmpData.json
new file mode 100644
index 0000000..8c150dd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/timeStmpData.json
@@ -0,0 +1,14 @@
+{"key":"2015-03-26 19:04:55.542"}
+{"key":"2015-03-26 19:04:55.542"}
+{"key":"2015-03-26 19:04:55.542"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
new file mode 100644
index 0000000..ccf0d35
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
@@ -0,0 +1,15 @@
+2011-07-26
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
\ No newline at end of file