You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/04/11 01:31:21 UTC
drill git commit: DRILL-2612: Union-All can work even when the right
input side comes from an empty data source
Repository: drill
Updated Branches:
refs/heads/master 38eb383a9 -> 093626e33
DRILL-2612: Union-All can work even when the right input side comes from an empty data source
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/093626e3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/093626e3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/093626e3
Branch: refs/heads/master
Commit: 093626e33671510eb3952a09f6fefcb33bc90f18
Parents: 38eb383
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Mon Mar 30 10:43:49 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Apr 10 16:28:20 2015 -0700
----------------------------------------------------------------------
.../impl/union/UnionAllRecordBatch.java | 79 +++++++++++++++++---
.../java/org/apache/drill/TestUnionAll.java | 21 ++++++
2 files changed, 90 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/093626e3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index b472890..52b1794 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -283,6 +283,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
private OneSideInput rightSide;
private IterOutcome upstream = IterOutcome.NOT_YET;
private boolean leftIsFinish = false;
+ private boolean rightIsFinish = false;
// These two schemas are obtained from the first record batches of the left and right inputs
// They are used to check if the schema is changed between recordbatches
@@ -298,31 +299,63 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
public IterOutcome nextBatch() throws SchemaChangeException {
if(upstream == RecordBatch.IterOutcome.NOT_YET) {
IterOutcome iterLeft = leftSide.nextBatch();
- if(iterLeft != IterOutcome.OK_NEW_SCHEMA) {
- upstream = iterLeft;
- return upstream;
+ switch(iterLeft) {
+ case OK_NEW_SCHEMA:
+ break;
+
+ case STOP:
+ case OUT_OF_MEMORY:
+ return iterLeft;
+
+ case NONE:
+ throw new SchemaChangeException("The left input of Union-All should not come from an empty data source");
+
+ default:
+ throw new IllegalStateException(String.format("Unknown state %s.", iterLeft));
}
IterOutcome iterRight = rightSide.nextBatch();
- if(iterRight != IterOutcome.OK_NEW_SCHEMA) {
- upstream = iterRight;
- return upstream;
+ switch(iterRight) {
+ case OK_NEW_SCHEMA:
+ // Unless there is no record batch on the left side of the inputs,
+ // always start processing from the left side
+ unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
+ inferOutputFields();
+ break;
+
+ case NONE:
+ // If the right input side comes from an empty data source,
+ // use the left input side's schema directly
+ unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
+ inferOutputFieldsFromLeftSide();
+ rightIsFinish = true;
+ break;
+
+ case STOP:
+ case OUT_OF_MEMORY:
+ return iterRight;
+
+ default:
+ throw new IllegalStateException(String.format("Unknown state %s.", iterRight));
}
- unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
- inferOutputFields();
upstream = IterOutcome.OK_NEW_SCHEMA;
return upstream;
} else {
unionAllRecordBatch.clearCurrentRecordBatch();
- if(leftIsFinish) {
+ if(leftIsFinish && rightIsFinish) {
+ upstream = IterOutcome.NONE;
+ return upstream;
+ } else if(leftIsFinish) {
IterOutcome iterOutcome = rightSide.nextBatch();
switch(iterOutcome) {
+ case NONE:
+ rightIsFinish = true;
+ // fall through
case STOP:
case OUT_OF_MEMORY:
- case NONE:
upstream = iterOutcome;
return upstream;
@@ -340,6 +373,23 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
default:
throw new IllegalStateException(String.format("Unknown state %s.", upstream));
}
+ } else if(rightIsFinish) {
+ IterOutcome iterOutcome = leftSide.nextBatch();
+ switch(iterOutcome) {
+ case STOP:
+ case OUT_OF_MEMORY:
+ case NONE:
+ upstream = iterOutcome;
+ return upstream;
+
+ case OK:
+ unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
+ upstream = iterOutcome;
+ return upstream;
+
+ default:
+ throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
+ }
} else {
IterOutcome iterOutcome = leftSide.nextBatch();
@@ -425,6 +475,15 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
}
+ private void inferOutputFieldsFromLeftSide() {
+ outputFields = Lists.newArrayList();
+ Iterator<MaterializedField> iter = leftSide.getRecordBatch().getSchema().iterator();
+ while(iter.hasNext()) {
+ MaterializedField field = iter.next();
+ outputFields.add(MaterializedField.create(field.getPath(), field.getType()));
+ }
+ }
+
public List<MaterializedField> getOutputFields() {
if(outputFields == null) {
throw new NullPointerException("Output fields have not been inferred");
http://git-wip-us.apache.org/repos/asf/drill/blob/093626e3/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index d31cd19..bcffe0c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -478,4 +478,25 @@ public class TestUnionAll extends BaseTestQuery{
.baselineValues((long) 4)
.build().run();
}
+
+ @Test // see DRILL-2612
+ public void testUnionAllRightEmptyJson() throws Exception {
+ String rootEmpty = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+ String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+ String queryRightEmpty = String.format(
+ "select key from dfs_test.`%s` " +
+ "union all " +
+ "select key from dfs_test.`%s`",
+ rootSimple,
+ rootEmpty);
+
+ testBuilder()
+ .sqlQuery(queryRightEmpty)
+ .unOrdered()
+ .baselineColumns("key")
+ .baselineValues(true)
+ .baselineValues(false)
+ .build().run();
+ }
}
\ No newline at end of file