You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/20 03:18:45 UTC
[37/51] [abbrv] git commit: fix NPE in merging receiver
fix NPE in merging receiver
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4cfdb3b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4cfdb3b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4cfdb3b6
Branch: refs/heads/master
Commit: 4cfdb3b653ba4db664abc14c4b1d51e4cec5c668
Parents: 0b1df5d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Apr 4 02:51:42 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 19 18:07:11 2014 -0700
----------------------------------------------------------------------
.../impl/mergereceiver/MergingRecordBatch.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4cfdb3b6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index c5c77a6..dcfe02f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -236,22 +236,27 @@ public class MergingRecordBatch implements RecordBatch {
if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
// reached the end of an incoming record batch
+ RawFragmentBatch nextBatch = null;
try {
- incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+ nextBatch = fragProviders[node.batchId].getNext();
+
+ while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
+ nextBatch = fragProviders[node.batchId].getNext();
+ }
} catch (IOException e) {
context.fail(e);
return IterOutcome.STOP;
}
- if (incomingBatches[node.batchId].getHeader().getIsLastBatch() ||
- incomingBatches[node.batchId].getHeader().getDef().getRecordCount() == 0) {
+ incomingBatches[node.batchId] = nextBatch;
+
+ if (nextBatch == null) {
// batch is empty
- incomingBatches[node.batchId].release();
boolean allBatchesEmpty = true;
for (RawFragmentBatch batch : incomingBatches) {
// see if all batches are empty so we can return OK_* or NONE
- if (!batch.getHeader().getIsLastBatch()) {
+ if (batch != null) {
allBatchesEmpty = false;
break;
}