You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:57 UTC
[19/27] git commit: prevent NPE in recordLoader. still need to handle
the last batch correctly.
prevent NPE in recordLoader. still need to handle the last batch correctly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/93121cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/93121cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/93121cbf
Branch: refs/heads/master
Commit: 93121cbf168f63881ad93d126e2fd9306a51f64a
Parents: a136a5b
Author: Ben Becker <be...@gmail.com>
Authored: Sat Aug 10 12:44:59 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700
----------------------------------------------------------------------
.../impl/partitionsender/OutgoingRecordBatch.java | 9 ++++++---
.../impl/partitionsender/PartitionSenderRootExec.java | 11 +++++++++--
2 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93121cbf/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 6847e5a..6eff778 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -79,9 +79,12 @@ public class OutgoingRecordBatch implements RecordBatch {
public void flush() throws SchemaChangeException {
if (recordCount == 0) {
- logger.warn("Attempted to flush an empty record batch");
+ // TODO: recordCount of 0 with isLast causes recordLoader to throw an NPE. Probably
+ // need to send notification rather than an actual batch.
+ logger.warn("Attempted to flush an empty record batch" + (isLast ? " (last batch)" : ""));
+ return;
}
- logger.debug("Flushing record batch. count is: " + recordCount + ", capacity is " + recordCapacity);
+
final ExecProtos.FragmentHandle handle = context.getHandle();
FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
handle.getQueryId(),
@@ -90,7 +93,7 @@ public class OutgoingRecordBatch implements RecordBatch {
operator.getOppositeMajorFragmentId(),
0,
getWritableBatch());
- tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+ tunnel.sendRecordBatch(statusHandler, context, writableBatch);
// reset values and reallocate the buffer for each value vector. NOTE: the value vector is directly
// referenced by generated code and must not be replaced.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93121cbf/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index aa25c96..293a711 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -86,15 +86,20 @@ class PartitionSenderRootExec implements RootExec {
if (incoming.getRecordCount() > 0)
partitioner.partitionBatch(incoming);
- // send all pending batches
try {
- flushOutgoingBatches(true, false);
+ // send any pending batches
+ for (OutgoingRecordBatch batch : outgoing) {
+ batch.setIsLast();
+ batch.flush();
+ }
} catch (SchemaChangeException e) {
incoming.kill();
logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
context.fail(e);
return false;
}
+ context.batchesCompleted.inc(1);
+ context.recordsCompleted.inc(incoming.getRecordCount());
return false;
case OK_NEW_SCHEMA:
@@ -111,6 +116,8 @@ class PartitionSenderRootExec implements RootExec {
}
case OK:
partitioner.partitionBatch(incoming);
+ context.batchesCompleted.inc(1);
+ context.recordsCompleted.inc(incoming.getRecordCount());
return true;
case NOT_YET:
default: