You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/04/06 07:10:23 UTC
drill git commit: DRILL-2591: In UnionAllRecordBactch,
the mechansim to detect schema change is corrected
Repository: drill
Updated Branches:
refs/heads/master 862ab91e9 -> a53e12336
DRILL-2591: In UnionAllRecordBactch, the mechansim to detect schema change is corrected
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a53e1233
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a53e1233
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a53e1233
Branch: refs/heads/master
Commit: a53e12336c29b421f1df51da480af9a65d70bb72
Parents: 862ab91
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Fri Mar 27 11:37:07 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sun Apr 5 21:43:41 2015 -0700
----------------------------------------------------------------------
.../impl/union/UnionAllRecordBatch.java | 31 ++++++++--
.../physical/visitor/FinalColumnReorderer.java | 20 ++++---
.../java/org/apache/drill/TestUnionAll.java | 63 +++++++++++++++++++-
.../src/test/resources/store/json/dateData.json | 12 ++++
.../test/resources/store/json/timeStmpData.json | 14 +++++
.../testframework/testUnionAllQueries/q18.tsv | 15 +++++
6 files changed, 141 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 806104a..61de3a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -284,6 +284,11 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
private IterOutcome upstream = IterOutcome.NOT_YET;
private boolean leftIsFinish = false;
+ // These two schemas are obtained from the first record batches of the left and right inputs
+ // They are used to check if the schema is changed between recordbatches
+ private BatchSchema leftSchema;
+ private BatchSchema rightSchema;
+
public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) {
this.unionAllRecordBatch = unionAllRecordBatch;
leftSide = new OneSideInput(left);
@@ -321,13 +326,20 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
upstream = iterOutcome;
return upstream;
+ case OK_NEW_SCHEMA:
+ if(!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
+ throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
+ }
+
+ upstream = IterOutcome.OK;
+ // fall through
case OK:
unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
upstream = iterOutcome;
return upstream;
default:
- throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
+ throw new IllegalStateException(String.format("Unknown state %s.", upstream));
}
} else {
IterOutcome iterOutcome = leftSide.nextBatch();
@@ -338,7 +350,14 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
upstream = iterOutcome;
return upstream;
- case OK:
+ case OK_NEW_SCHEMA:
+ if(!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
+ throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
+ }
+
+ upstream = IterOutcome.OK;
+ // fall through
+ case OK:
unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
upstream = iterOutcome;
return upstream;
@@ -350,7 +369,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
return upstream;
default:
- throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
+ throw new IllegalStateException(String.format("Unknown state %s.", upstream));
}
}
}
@@ -360,8 +379,10 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
// where the output type is chosen based on DRILL's implicit casting rules
private void inferOutputFields() {
outputFields = Lists.newArrayList();
- Iterator<MaterializedField> leftIter = leftSide.getRecordBatch().getSchema().iterator();
- Iterator<MaterializedField> rightIter = rightSide.getRecordBatch().getSchema().iterator();
+ leftSchema = leftSide.getRecordBatch().getSchema();
+ rightSchema = rightSide.getRecordBatch().getSchema();
+ Iterator<MaterializedField> leftIter = leftSchema.iterator();
+ Iterator<MaterializedField> rightIter = rightSchema.iterator();
int index = 1;
while(leftIter.hasNext() && rightIter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
index 1aa033b..375d69f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
@@ -45,14 +45,10 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
@Override
public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
Prel newChild = ((Prel) prel.getChild()).accept(this, value);
- return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild)));
+ return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild, true)));
}
private Prel addTrivialOrderedProjectPrel(Prel prel) {
- if (!prel.needsFinalColumnReordering()) {
- return prel;
- }
-
RelDataType t = prel.getRowType();
RexBuilder b = prel.getCluster().getRexBuilder();
@@ -64,16 +60,24 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
return prel;
}
- for (int i =0; i < projectCount; i++) {
+ for (int i = 0; i < projectCount; i++) {
projections.add(b.makeInputRef(prel, i));
}
return new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, prel.getRowType());
}
+ private Prel addTrivialOrderedProjectPrel(Prel prel, boolean checkNecessity) {
+ if(checkNecessity && !prel.needsFinalColumnReordering()) {
+ return prel;
+ } else {
+ return addTrivialOrderedProjectPrel(prel);
+ }
+ }
+
@Override
public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
Prel newChild = ((Prel) prel.getChild()).accept(this, null);
- return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild)));
+ return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild, true)));
}
@Override
@@ -105,7 +109,7 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
boolean needProjectBelowUnion = !(p instanceof ProjectPrel);
if(needProjectBelowUnion) {
- child = addTrivialOrderedProjectPrel(child);
+ child = addTrivialOrderedProjectPrel(child, false);
}
children.add(child);
http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index fcf5c9f..fee1d6a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -25,7 +25,7 @@ import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
import org.junit.Test;
public class TestUnionAll extends BaseTestQuery{
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
@Test // Simple Union-All over two scans
public void testUnionAll1() throws Exception {
@@ -368,4 +368,65 @@ public class TestUnionAll extends BaseTestQuery{
test(query);
}
+
+ @Test // see DRILL-2591
+ public void testDateAndTimestampJson() throws Exception {
+ String rootDate = FileUtils.getResourceAsFile("/store/json/dateData.json").toURI().toString();
+ String rootTimpStmp = FileUtils.getResourceAsFile("/store/json/timeStmpData.json").toURI().toString();
+
+ String query = String.format(
+ "(select max(key) as key from dfs_test.`%s` " +
+ "union all " +
+ "select key from dfs_test.`%s`)", rootDate, rootTimpStmp);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .csvBaselineFile("testframework/testUnionAllQueries/q18.tsv")
+ .baselineTypes(TypeProtos.MinorType.VARCHAR)
+ .baselineColumns("key")
+ .build().run();
+ }
+
+ @Test // see DRILL-2637
+ public void testUnionAllOneInputContainsAggFunction() throws Exception {
+ String root = FileUtils.getResourceAsFile("/multilevel/csv/1994/Q1/orders_94_q1.csv").toURI().toString();
+ String query1 = String.format("select * from ((select count(c1) as ct from (select columns[0] c1 from dfs.`%s`)) \n" +
+ "union all \n" +
+ "(select columns[0] c2 from dfs.`%s`)) order by ct limit 3", root, root);
+
+ String query2 = String.format("select * from ((select columns[0] ct from dfs.`%s`)\n" +
+ "union all \n" +
+ "(select count(c1) as c2 from (select columns[0] c1 from dfs.`%s`))) order by ct limit 3", root, root);
+
+ String query3 = String.format("select * from ((select count(c1) as ct from (select columns[0] c1 from dfs.`%s`))\n" +
+ "union all \n" +
+ "(select count(c1) as c2 from (select columns[0] c1 from dfs.`%s`))) order by ct", root, root);
+
+ testBuilder()
+ .sqlQuery(query1)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 10)
+ .baselineValues((long) 66)
+ .baselineValues((long) 99)
+ .build().run();
+
+ testBuilder()
+ .sqlQuery(query2)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 10)
+ .baselineValues((long) 66)
+ .baselineValues((long) 99)
+ .build().run();
+
+ testBuilder()
+ .sqlQuery(query3)
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues((long) 10)
+ .baselineValues((long) 10)
+ .build().run();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/store/json/dateData.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/dateData.json b/exec/java-exec/src/test/resources/store/json/dateData.json
new file mode 100644
index 0000000..d15d9bc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/dateData.json
@@ -0,0 +1,12 @@
+{"key":"2009-03-03"}
+{"key":"2001-08-27"}
+{"key":"2011-07-26"}
+{"key":"1970-09-02"}
+{"key":"1983-04-24"}
+{"key":"2007-02-01"}
+{"key":"1977-08-03"}
+{"key":"1962-05-14"}
+{"key":"1950-02-16"}
+{"key":"1983-09-05"}
+{"key":"2000-09-09"}
+{"key":"1960-08-18"}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/store/json/timeStmpData.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/timeStmpData.json b/exec/java-exec/src/test/resources/store/json/timeStmpData.json
new file mode 100644
index 0000000..8c150dd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/timeStmpData.json
@@ -0,0 +1,14 @@
+{"key":"2015-03-26 19:04:55.542"}
+{"key":"2015-03-26 19:04:55.542"}
+{"key":"2015-03-26 19:04:55.542"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.543"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
+{"key":"2015-03-26 19:04:55.544"}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
new file mode 100644
index 0000000..ccf0d35
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
@@ -0,0 +1,15 @@
+2011-07-26
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
\ No newline at end of file