You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/11/22 22:52:08 UTC
[06/12] drill git commit: DRILL-5936: Refactor MergingRecordBatch
based on code inspection
DRILL-5936: Refactor MergingRecordBatch based on code inspection
This closes #1025
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36abdd79
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36abdd79
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36abdd79
Branch: refs/heads/master
Commit: 36abdd79fe57a596dc1c508306acefdd6b3163ea
Parents: 23e6565
Author: Vlad Rozov <vr...@apache.org>
Authored: Mon Nov 6 17:55:56 2017 -0800
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 22 10:35:06 2017 -0800
----------------------------------------------------------------------
.../impl/mergereceiver/MergingRecordBatch.java | 41 +++++++++-----------
1 file changed, 18 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/36abdd79/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a7d3f39..f9ceff2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -101,7 +101,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private MergingReceiverGeneratorBase merger;
private final MergingReceiverPOP config;
private boolean hasRun = false;
- private boolean prevBatchWasFull = false;
+ private boolean outgoingBatchHasSpace = true;
private boolean hasMoreIncoming = true;
private int outgoingPosition = 0;
@@ -177,11 +177,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
boolean schemaChanged = false;
- if (prevBatchWasFull) {
+ if (!outgoingBatchHasSpace) {
logger.debug("Outgoing vectors were full on last iteration");
allocateOutgoing();
outgoingPosition = 0;
- prevBatchWasFull = false;
+ outgoingBatchHasSpace = true;
}
if (!hasMoreIncoming) {
@@ -398,14 +398,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// finished lazy initialization
}
- while (!pqueue.isEmpty()) {
- // pop next value from pq and copy to outgoing batch
- final Node node = pqueue.peek();
- if (!copyRecordToOutgoingBatch(node)) {
- logger.debug("Outgoing vectors space is full; breaking");
- prevBatchWasFull = true;
+ while (outgoingBatchHasSpace) {
+ // poll next value from pq and copy to outgoing batch
+ final Node node = pqueue.poll();
+ if (node == null) {
+ break;
}
- pqueue.poll();
+ outgoingBatchHasSpace = copyRecordToOutgoingBatch(node);
if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
// reached the end of an incoming record batch
@@ -448,11 +447,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// this batch is empty; since the pqueue no longer references this batch, it will be
// ignored in subsequent iterations.
- if (prevBatchWasFull) {
- break;
- } else {
- continue;
- }
+ continue;
}
final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
@@ -469,15 +464,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// add front value from batch[x] to priority queue
if (batchLoaders[node.batchId].getRecordCount() != 0) {
- pqueue.add(new Node(node.batchId, 0));
+ node.valueIndex = 0;
+ pqueue.add(node);
}
} else {
- pqueue.add(new Node(node.batchId, node.valueIndex + 1));
- }
-
- if (prevBatchWasFull) {
- break;
+ node.valueIndex++;
+ pqueue.add(node);
}
}
@@ -786,6 +779,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
* @param node Reference to the next record to copy from the incoming batches
*/
private boolean copyRecordToOutgoingBatch(final Node node) {
+ assert outgoingPosition < OUTGOING_BATCH_SIZE
+ : String.format("Outgoing position %d must be less than bath size %d", outgoingPosition, OUTGOING_BATCH_SIZE);
assert ++outputCounts[node.batchId] <= inputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
final int inIndex = (node.batchId << 16) + node.valueIndex;
@@ -794,8 +789,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
} catch (SchemaChangeException e) {
throw new UnsupportedOperationException(e);
}
- outgoingPosition++;
- if (outgoingPosition == OUTGOING_BATCH_SIZE) {
+ if (++outgoingPosition == OUTGOING_BATCH_SIZE) {
+ logger.debug("Outgoing vectors space is full (batch size {}).", OUTGOING_BATCH_SIZE);
return false;
}
return true;