You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:58 UTC
[20/27] git commit: fix issue with end of batch
fix issue with end of batch
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/add8c724
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/add8c724
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/add8c724
Branch: refs/heads/master
Commit: add8c724b9490feba442ae831d9b9c0d6babe413
Parents: 93121cb
Author: Ben Becker <be...@gmail.com>
Authored: Sun Aug 11 15:12:32 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700
----------------------------------------------------------------------
.../partitionsender/OutgoingRecordBatch.java | 71 +++++++++++++-------
.../PartitionSenderRootExec.java | 13 ++--
2 files changed, 52 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/add8c724/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 6eff778..927cc75 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -19,7 +19,9 @@
package org.apache.drill.exec.physical.impl.partitionsender;
import java.util.Iterator;
+import java.util.List;
+import com.beust.jcommander.internal.Lists;
import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -76,34 +78,58 @@ public class OutgoingRecordBatch implements RecordBatch {
public void incRecordCount() {
++recordCount;
}
-
- public void flush() throws SchemaChangeException {
- if (recordCount == 0) {
- // TODO: recordCount of 0 with isLast causes recordLoader to throw an NPE. Probably
- // need to send notification rather than an actual batch.
- logger.warn("Attempted to flush an empty record batch" + (isLast ? " (last batch)" : ""));
- return;
- }
+ /**
+ * Send the record batch to the target node, then reset the value vectors
+ *
+ * @return true if a flush was needed; otherwise false
+ * @throws SchemaChangeException
+ */
+ public boolean flush() throws SchemaChangeException {
+ logger.error("Creating FragmentWritableBatch. IsLast? " + (isLast ? " (last batch)" : ""));
final ExecProtos.FragmentHandle handle = context.getHandle();
- FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
- handle.getQueryId(),
- handle.getMajorFragmentId(),
- handle.getMinorFragmentId(),
- operator.getOppositeMajorFragmentId(),
- 0,
- getWritableBatch());
- tunnel.sendRecordBatch(statusHandler, context, writableBatch);
-
- // reset values and reallocate the buffer for each value vector. NOTE: the value vector is directly
- // referenced by generated code and must not be replaced.
+
+ if (recordCount != 0) {
+ FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+ handle.getQueryId(),
+ handle.getMajorFragmentId(),
+ handle.getMinorFragmentId(),
+ operator.getOppositeMajorFragmentId(),
+ 0,
+ getWritableBatch());
+ tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+ } else {
+ logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
+
+ if (isLast) {
+
+ // if the last batch is empty, it must not contain any value vectors.
+ vectorContainer = new VectorContainer();
+
+ // send final batch
+ FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+ handle.getQueryId(),
+ handle.getMajorFragmentId(),
+ handle.getMinorFragmentId(),
+ operator.getOppositeMajorFragmentId(),
+ 0,
+ getWritableBatch());
+ tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+ return true;
+
+ }
+ }
+
+ // reset values and reallocate the buffer for each value vector based on the incoming batch.
+ // NOTE: the value vector is directly referenced by generated code; therefore references
+ // must remain valid.
recordCount = 0;
for (VectorWrapper v : vectorContainer) {
- logger.debug("Reallocating vv to capacity " + recordCapacity + " after flush. " + v.getValueVector());
- getAllocator(v.getValueVector(),
- v.getValueVector()).alloc(recordCapacity);
+ logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush.");
+ getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
}
if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
+ return true;
}
@@ -141,7 +167,6 @@ public class OutgoingRecordBatch implements RecordBatch {
recordCapacity = 0;
for (VectorWrapper v : vectorContainer)
v.getValueVector().clear();
- initializeBatch();
}
public void setIsLast() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/add8c724/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 293a711..b2ca64e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -80,12 +80,8 @@ class PartitionSenderRootExec implements RootExec {
RecordBatch.IterOutcome out = incoming.next();
logger.debug("Partitioner.next(): got next record batch with status {}", out);
switch(out){
- case STOP:
case NONE:
- // populate outgoing batches
- if (incoming.getRecordCount() > 0)
- partitioner.partitionBatch(incoming);
-
+ case STOP:
try {
// send any pending batches
for (OutgoingRecordBatch batch : outgoing) {
@@ -96,10 +92,7 @@ class PartitionSenderRootExec implements RootExec {
incoming.kill();
logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
context.fail(e);
- return false;
}
- context.batchesCompleted.inc(1);
- context.recordsCompleted.inc(incoming.getRecordCount());
return false;
case OK_NEW_SCHEMA:
@@ -249,8 +242,10 @@ class PartitionSenderRootExec implements RootExec {
if (isLastBatch)
batch.setIsLast();
batch.flush();
- if (schemaChanged)
+ if (schemaChanged) {
batch.resetBatch();
+ batch.initializeBatch();
+ }
}
}
}