You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:58 UTC

[20/27] git commit: fix issue with end of batch

fix issue with end of batch


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/add8c724
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/add8c724
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/add8c724

Branch: refs/heads/master
Commit: add8c724b9490feba442ae831d9b9c0d6babe413
Parents: 93121cb
Author: Ben Becker <be...@gmail.com>
Authored: Sun Aug 11 15:12:32 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700

----------------------------------------------------------------------
 .../partitionsender/OutgoingRecordBatch.java    | 71 +++++++++++++-------
 .../PartitionSenderRootExec.java                | 13 ++--
 2 files changed, 52 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/add8c724/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 6eff778..927cc75 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -19,7 +19,9 @@
 package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.util.Iterator;
+import java.util.List;
 
+import com.beust.jcommander.internal.Lists;
 import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -76,34 +78,58 @@ public class OutgoingRecordBatch implements RecordBatch {
   public void incRecordCount() {
     ++recordCount;
   }
-  
-  public void flush() throws SchemaChangeException {
-    if (recordCount == 0) {
-      // TODO:  recordCount of 0 with isLast causes recordLoader to throw an NPE.  Probably
-      //        need to send notification rather than an actual batch.
-      logger.warn("Attempted to flush an empty record batch" + (isLast ? " (last batch)" : ""));
-      return;
-    }
 
+  /**
+   * Send the record batch to the target node, then reset the value vectors
+   * 
+   * @return true if a flush was needed; otherwise false
+   * @throws SchemaChangeException
+   */
+  public boolean flush() throws SchemaChangeException {
+    logger.error("Creating FragmentWritableBatch.  IsLast? " + (isLast ? " (last batch)" : ""));
     final ExecProtos.FragmentHandle handle = context.getHandle();
-    FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
-                                                                    handle.getQueryId(),
-                                                                    handle.getMajorFragmentId(),
-                                                                    handle.getMinorFragmentId(),
-                                                                    operator.getOppositeMajorFragmentId(),
-                                                                    0,
-                                                                    getWritableBatch());
-    tunnel.sendRecordBatch(statusHandler, context, writableBatch);
-
-    // reset values and reallocate the buffer for each value vector.  NOTE: the value vector is directly
-    // referenced by generated code and must not be replaced.
+
+    if (recordCount != 0) {
+      FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+                                                                      handle.getQueryId(),
+                                                                      handle.getMajorFragmentId(),
+                                                                      handle.getMinorFragmentId(),
+                                                                      operator.getOppositeMajorFragmentId(),
+                                                                      0,
+                                                                      getWritableBatch());
+      tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+    } else {
+      logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
+
+      if (isLast) {
+
+        // if the last batch is empty, it must not contain any value vectors.
+        vectorContainer = new VectorContainer();
+
+        // send final batch
+        FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+                                                                        handle.getQueryId(),
+                                                                        handle.getMajorFragmentId(),
+                                                                        handle.getMinorFragmentId(),
+                                                                        operator.getOppositeMajorFragmentId(),
+                                                                        0,
+                                                                        getWritableBatch());
+        tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+        return true;
+
+      }
+    }
+
+    // reset values and reallocate the buffer for each value vector based on the incoming batch.
+    // NOTE: the value vector is directly referenced by generated code; therefore references
+    // must remain valid.
     recordCount = 0;
     for (VectorWrapper v : vectorContainer) {
-      logger.debug("Reallocating vv to capacity " + recordCapacity + " after flush. " + v.getValueVector());
-      getAllocator(v.getValueVector(),
-                   v.getValueVector()).alloc(recordCapacity);
+      logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush.");
+      getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
     }
     if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
+    return true;
   }
 
 
@@ -141,7 +167,6 @@ public class OutgoingRecordBatch implements RecordBatch {
     recordCapacity = 0;
     for (VectorWrapper v : vectorContainer)
       v.getValueVector().clear();
-    initializeBatch();
   }
 
   public void setIsLast() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/add8c724/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 293a711..b2ca64e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -80,12 +80,8 @@ class PartitionSenderRootExec implements RootExec {
     RecordBatch.IterOutcome out = incoming.next();
     logger.debug("Partitioner.next(): got next record batch with status {}", out);
     switch(out){
-      case STOP:
       case NONE:
-        // populate outgoing batches
-        if (incoming.getRecordCount() > 0)
-          partitioner.partitionBatch(incoming);
-
+      case STOP:
         try {
           // send any pending batches
           for (OutgoingRecordBatch batch : outgoing) {
@@ -96,10 +92,7 @@ class PartitionSenderRootExec implements RootExec {
           incoming.kill();
           logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
           context.fail(e);
-          return false;
         }
-        context.batchesCompleted.inc(1);
-        context.recordsCompleted.inc(incoming.getRecordCount());
         return false;
 
       case OK_NEW_SCHEMA:
@@ -249,8 +242,10 @@ class PartitionSenderRootExec implements RootExec {
       if (isLastBatch)
         batch.setIsLast();
       batch.flush();
-      if (schemaChanged)
+      if (schemaChanged) {
         batch.resetBatch();
+        batch.initializeBatch();
+      }
     }
   }
 }