You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/12 10:32:46 UTC
[3/4] drill git commit: DRILL-2476: Added BatchState.STOP in
buildSchema() so AbstractRecordBatch returns IterOutcome.STOP
DRILL-2476: Added BatchState.STOP in buildSchema() so AbstractRecordBatch returns IterOutcome.STOP
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/eb79e805
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/eb79e805
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/eb79e805
Branch: refs/heads/master
Commit: eb79e805fcc01280a77e3525cdd2638c9f22b5d1
Parents: 6cc89e9
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed May 6 14:24:26 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon May 11 22:40:06 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/physical/impl/TopN/TopNBatch.java | 1 +
.../drill/exec/physical/impl/WriterRecordBatch.java | 4 ----
.../drill/exec/physical/impl/join/HashJoinBatch.java | 5 +++++
.../drill/exec/physical/impl/join/MergeJoinBatch.java | 12 ++++++++++--
.../exec/physical/impl/join/NestedLoopJoinBatch.java | 5 +++++
.../physical/impl/mergereceiver/MergingRecordBatch.java | 3 ++-
.../exec/physical/impl/xsort/ExternalSortBatch.java | 1 +
7 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index c3e70f5..1cf6213 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -119,6 +119,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
super.close();
}
+ @Override
public void buildSchema() throws SchemaChangeException {
VectorContainer c = new VectorContainer(oContext);
IterOutcome outcome = next(incoming);
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 28a99d9..d5d64a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -77,10 +77,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
}
@Override
- public void buildSchema() throws SchemaChangeException {
- }
-
- @Override
public IterOutcome innerNext() {
if(processed) {
// cleanup();
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 56ce0ee..6490251 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -172,6 +172,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
leftUpstream = next(left);
rightUpstream = next(right);
+ if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return;
+ }
+
if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
state = BatchState.OUT_OF_MEMORY;
return;
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 0430f1b..026d79e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -134,7 +134,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
comparator = JoinUtils.checkAndSetComparison(condition, comparator);
}
assert comparator != JoinComparator.NONE;
- areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
+ areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM);
}
public JoinRelType getJoinType() {
@@ -146,10 +146,18 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
return status.getOutPosition();
}
+ @Override
public void buildSchema() throws SchemaChangeException {
status.ensureInitial();
- if (status.getLastLeft() == IterOutcome.OUT_OF_MEMORY || status.getLastRight() == IterOutcome.OUT_OF_MEMORY) {
+ final IterOutcome leftOutcome = status.getLastLeft();
+ final IterOutcome rightOutcome = status.getLastRight();
+ if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return;
+ }
+
+ if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == IterOutcome.OUT_OF_MEMORY) {
state = BatchState.OUT_OF_MEMORY;
return;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 4c86f5c..de0d8e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -277,6 +277,11 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
leftUpstream = next(LEFT_INPUT, left);
rightUpstream = next(RIGHT_INPUT, right);
+ if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return;
+ }
+
if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
state = BatchState.OUT_OF_MEMORY;
return;
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 5d990f0..b28b7b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -460,7 +460,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
return outgoingContainer.getSchema();
}
- public void buildSchema() {
+ @Override
+ public void buildSchema() throws SchemaChangeException {
// find frag provider that has data to use to build schema, and put in tempBatchHolder for later use
tempBatchHolder = new RawFragmentBatch[fragProviders.length];
int i = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 3159811..5cdd2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -171,6 +171,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
super.close();
}
+ @Override
public void buildSchema() throws SchemaChangeException {
IterOutcome outcome = next(incoming);
switch (outcome) {