You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:57 UTC

[19/27] git commit: prevent NPE in recordLoader. still need to handle the last batch correctly.

prevent NPE in recordLoader.  still need to handle the last batch correctly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/93121cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/93121cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/93121cbf

Branch: refs/heads/master
Commit: 93121cbf168f63881ad93d126e2fd9306a51f64a
Parents: a136a5b
Author: Ben Becker <be...@gmail.com>
Authored: Sat Aug 10 12:44:59 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700

----------------------------------------------------------------------
 .../impl/partitionsender/OutgoingRecordBatch.java        |  9 ++++++---
 .../impl/partitionsender/PartitionSenderRootExec.java    | 11 +++++++++--
 2 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93121cbf/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 6847e5a..6eff778 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -79,9 +79,12 @@ public class OutgoingRecordBatch implements RecordBatch {
   
   public void flush() throws SchemaChangeException {
     if (recordCount == 0) {
-      logger.warn("Attempted to flush an empty record batch");
+      // TODO:  recordCount of 0 with isLast causes recordLoader to throw an NPE.  Probably
+      //        need to send notification rather than an actual batch.
+      logger.warn("Attempted to flush an empty record batch" + (isLast ? " (last batch)" : ""));
+      return;
     }
-    logger.debug("Flushing record batch.  count is: " + recordCount + ", capacity is " + recordCapacity);
+
     final ExecProtos.FragmentHandle handle = context.getHandle();
     FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
                                                                     handle.getQueryId(),
@@ -90,7 +93,7 @@ public class OutgoingRecordBatch implements RecordBatch {
                                                                     operator.getOppositeMajorFragmentId(),
                                                                     0,
                                                                     getWritableBatch());
-     tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+    tunnel.sendRecordBatch(statusHandler, context, writableBatch);
 
     // reset values and reallocate the buffer for each value vector.  NOTE: the value vector is directly
     // referenced by generated code and must not be replaced.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93121cbf/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index aa25c96..293a711 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -86,15 +86,20 @@ class PartitionSenderRootExec implements RootExec {
         if (incoming.getRecordCount() > 0)
           partitioner.partitionBatch(incoming);
 
-        // send all pending batches
         try {
-          flushOutgoingBatches(true, false);
+          // send any pending batches
+          for (OutgoingRecordBatch batch : outgoing) {
+            batch.setIsLast();
+            batch.flush();
+          }
         } catch (SchemaChangeException e) {
           incoming.kill();
           logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
           context.fail(e);
           return false;
         }
+        context.batchesCompleted.inc(1);
+        context.recordsCompleted.inc(incoming.getRecordCount());
         return false;
 
       case OK_NEW_SCHEMA:
@@ -111,6 +116,8 @@ class PartitionSenderRootExec implements RootExec {
         }
       case OK:
         partitioner.partitionBatch(incoming);
+        context.batchesCompleted.inc(1);
+        context.recordsCompleted.inc(incoming.getRecordCount());
         return true;
       case NOT_YET:
       default: