You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/12 10:32:46 UTC

[3/4] drill git commit: DRILL-2476: Added BatchState.STOP in buildSchema() so AbstractRecordBatch returns IterOutcome.STOP

DRILL-2476: Added BatchState.STOP in buildSchema() so AbstractRecordBatch returns IterOutcome.STOP


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/eb79e805
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/eb79e805
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/eb79e805

Branch: refs/heads/master
Commit: eb79e805fcc01280a77e3525cdd2638c9f22b5d1
Parents: 6cc89e9
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed May 6 14:24:26 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon May 11 22:40:06 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/physical/impl/TopN/TopNBatch.java |  1 +
 .../drill/exec/physical/impl/WriterRecordBatch.java     |  4 ----
 .../drill/exec/physical/impl/join/HashJoinBatch.java    |  5 +++++
 .../drill/exec/physical/impl/join/MergeJoinBatch.java   | 12 ++++++++++--
 .../exec/physical/impl/join/NestedLoopJoinBatch.java    |  5 +++++
 .../physical/impl/mergereceiver/MergingRecordBatch.java |  3 ++-
 .../exec/physical/impl/xsort/ExternalSortBatch.java     |  1 +
 7 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index c3e70f5..1cf6213 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -119,6 +119,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     super.close();
   }
 
+  @Override
   public void buildSchema() throws SchemaChangeException {
     VectorContainer c = new VectorContainer(oContext);
     IterOutcome outcome = next(incoming);

http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 28a99d9..d5d64a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -77,10 +77,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
   }
 
   @Override
-  public void buildSchema() throws SchemaChangeException {
-  }
-
-  @Override
   public IterOutcome innerNext() {
     if(processed) {
 //      cleanup();

http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 56ce0ee..6490251 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -172,6 +172,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     leftUpstream = next(left);
     rightUpstream = next(right);
 
+    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+      state = BatchState.STOP;
+      return;
+    }
+
     if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
       state = BatchState.OUT_OF_MEMORY;
       return;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 0430f1b..026d79e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -134,7 +134,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       comparator = JoinUtils.checkAndSetComparison(condition, comparator);
     }
     assert comparator != JoinComparator.NONE;
-    areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
+    areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM);
   }
 
   public JoinRelType getJoinType() {
@@ -146,10 +146,18 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     return status.getOutPosition();
   }
 
+  @Override
   public void buildSchema() throws SchemaChangeException {
     status.ensureInitial();
 
-    if (status.getLastLeft() == IterOutcome.OUT_OF_MEMORY || status.getLastRight() == IterOutcome.OUT_OF_MEMORY) {
+    final IterOutcome leftOutcome = status.getLastLeft();
+    final IterOutcome rightOutcome = status.getLastRight();
+    if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) {
+      state = BatchState.STOP;
+      return;
+    }
+
+    if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == IterOutcome.OUT_OF_MEMORY) {
       state = BatchState.OUT_OF_MEMORY;
       return;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 4c86f5c..de0d8e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -277,6 +277,11 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
       leftUpstream = next(LEFT_INPUT, left);
       rightUpstream = next(RIGHT_INPUT, right);
 
+      if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+        state = BatchState.STOP;
+        return;
+      }
+
       if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
         state = BatchState.OUT_OF_MEMORY;
         return;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 5d990f0..b28b7b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -460,7 +460,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     return outgoingContainer.getSchema();
   }
 
-  public void buildSchema() {
+  @Override
+  public void buildSchema() throws SchemaChangeException {
     // find frag provider that has data to use to build schema, and put in tempBatchHolder for later use
     tempBatchHolder = new RawFragmentBatch[fragProviders.length];
     int i = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 3159811..5cdd2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -171,6 +171,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     super.close();
   }
 
+  @Override
   public void buildSchema() throws SchemaChangeException {
     IterOutcome outcome = next(incoming);
     switch (outcome) {