You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/20 03:18:45 UTC

[37/51] [abbrv] git commit: fix NPE in merging receiver

fix NPE in merging receiver


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4cfdb3b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4cfdb3b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4cfdb3b6

Branch: refs/heads/master
Commit: 4cfdb3b653ba4db664abc14c4b1d51e4cec5c668
Parents: 0b1df5d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Apr 4 02:51:42 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 19 18:07:11 2014 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java       | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4cfdb3b6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index c5c77a6..dcfe02f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -236,22 +236,27 @@ public class MergingRecordBatch implements RecordBatch {
 
       if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
         // reached the end of an incoming record batch
+        RawFragmentBatch nextBatch = null;
         try {
-          incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+          nextBatch = fragProviders[node.batchId].getNext();
+
+          while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
+            nextBatch = fragProviders[node.batchId].getNext();
+          }
         } catch (IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
         }
 
-        if (incomingBatches[node.batchId].getHeader().getIsLastBatch() ||
-            incomingBatches[node.batchId].getHeader().getDef().getRecordCount() == 0) {
+        incomingBatches[node.batchId] = nextBatch;
+
+        if (nextBatch == null) {
           // batch is empty
-          incomingBatches[node.batchId].release();
           boolean allBatchesEmpty = true;
 
           for (RawFragmentBatch batch : incomingBatches) {
             // see if all batches are empty so we can return OK_* or NONE
-            if (!batch.getHeader().getIsLastBatch()) {
+            if (batch != null) {
               allBatchesEmpty = false;
               break;
             }