You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/04/28 04:58:32 UTC
[5/5] drill git commit: DRILL-2083: Fix bug in merging receiver
DRILL-2083: Fix bug in merging receiver
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/57a96d20
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/57a96d20
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/57a96d20
Branch: refs/heads/master
Commit: 57a96d200e12c0efcad3f3ca9d935c42647234b1
Parents: 6878bfd
Author: Steven Phillips <sm...@apache.org>
Authored: Mon Apr 20 15:12:57 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon Apr 27 14:12:13 2015 -0700
----------------------------------------------------------------------
.../impl/mergereceiver/MergingRecordBatch.java | 35 ++++++++++++++------
.../impl/mergereceiver/TestMergingReceiver.java | 28 ++++++++--------
.../test/resources/mergerecv/empty_batch.json | 2 +-
.../resources/mergerecv/merging_receiver.json | 4 +--
.../resources/mergerecv/multiple_providers.json | 10 +++---
5 files changed, 46 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 3cf2857..40cbc89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -112,7 +112,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private int[] batchOffsets;
private PriorityQueue <Node> pqueue;
private RawFragmentBatch emptyBatch = null;
- private RawFragmentBatch[] tempBatchHolder; //
+ private RawFragmentBatch[] tempBatchHolder;
+ private long[] inputCounts;
+ private long[] outputCounts;
public static enum Metric implements MetricDef{
BYTES_RECEIVED,
@@ -135,15 +137,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
this.outgoingContainer = new VectorContainer(oContext);
this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
this.config = config;
+ this.inputCounts = new long[config.getNumSenders()];
+ this.outputCounts = new long[config.getNumSenders()];
}
- private RawFragmentBatch getNext(final RawFragmentBatchProvider provider) throws IOException{
+ private RawFragmentBatch getNext(final int providerIndex) throws IOException{
stats.startWait();
+ final RawFragmentBatchProvider provider = fragProviders[providerIndex];
try {
final RawFragmentBatch b = provider.getNext();
if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+ inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
}
return b;
} finally {
@@ -186,9 +192,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
rawBatch = tempBatchHolder[p];
tempBatchHolder[p] = null;
} else {
- rawBatch = getNext(provider);
+ rawBatch = getNext(p);
}
- p++;
if (rawBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
@@ -204,7 +209,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
emptyBatch = rawBatch;
}
try {
- while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
+ while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
;
}
if (rawBatch == null && !context.shouldContinue()) {
@@ -220,6 +225,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
rawBatches.add(emptyBatch);
}
}
+ p++;
}
// allocate the incoming record batch loaders
@@ -304,7 +310,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
for (int b = 0; b < senderCount; ++b) {
while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
try {
- final RawFragmentBatch batch = getNext(fragProviders[b]);
+ final RawFragmentBatch batch = getNext(b);
incomingBatches[b] = batch;
if (batch != null) {
batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
@@ -335,7 +341,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
if (!copyRecordToOutgoingBatch(node)) {
logger.debug("Outgoing vectors space is full; breaking");
prevBatchWasFull = true;
- break;
}
pqueue.poll();
@@ -349,11 +354,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// reached the end of an incoming record batch
RawFragmentBatch nextBatch = null;
try {
- nextBatch = getNext(fragProviders[node.batchId]);
+ nextBatch = getNext(node.batchId);
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
- nextBatch = getNext(fragProviders[node.batchId]);
+ nextBatch = getNext(node.batchId);
}
+ assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
+ : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
if (nextBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
@@ -383,7 +390,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// this batch is empty; since the pqueue no longer references this batch, it will be
// ignored in subsequent iterations.
- continue;
+ if (prevBatchWasFull) {
+ break;
+ } else {
+ continue;
+ }
}
final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
@@ -447,7 +458,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
state = BatchState.DONE;
return;
}
- final RawFragmentBatch batch = getNext(fragProviders[i]);
+ final RawFragmentBatch batch = getNext(i);
if (batch.getHeader().getDef().getFieldCount() == 0) {
i++;
continue;
@@ -661,6 +672,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
* @param node Reference to the next record to copy from the incoming batches
*/
private boolean copyRecordToOutgoingBatch(final Node node) {
+ assert ++outputCounts[node.batchId] <= inputCounts[node.batchId]
+ : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
final int inIndex = (node.batchId << 16) + node.valueIndex;
merger.doCopy(inIndex, outgoingPosition);
outgoingPosition++;
http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
index cf9dd84..0122c08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
@@ -66,21 +66,21 @@ public class TestMergingReceiver extends PopUnitTestBase {
}
for (Object cell : row) {
if (cell == null) {
- System.out.print("<null> ");
+// System.out.print("<null> ");
continue;
}
int len = cell.toString().length();
- System.out.print(cell + " ");
+// System.out.print(cell + " ");
for (int i = 0; i < (30 - len); ++i) {
- System.out.print(" ");
+// System.out.print(" ");
}
}
- System.out.println();
+// System.out.println();
}
b.release();
batchLoader.clear();
}
- assertEquals(200, count);
+ assertEquals(200000, count);
}
}
@@ -122,17 +122,17 @@ public class TestMergingReceiver extends PopUnitTestBase {
}
for (Object cell : row) {
int len = cell.toString().length();
- System.out.print(cell + " ");
+// System.out.print(cell + " ");
for (int i = 0; i < (30 - len); ++i) {
- System.out.print(" ");
+// System.out.print(" ");
}
}
- System.out.println();
+// System.out.println();
}
b.release();
batchLoader.clear();
}
- assertEquals(400, count);
+ assertEquals(400000, count);
}
}
@@ -163,21 +163,21 @@ public class TestMergingReceiver extends PopUnitTestBase {
}
for (Object cell : row) {
if (cell == null) {
- System.out.print("<null> ");
+// System.out.print("<null> ");
continue;
}
int len = cell.toString().length();
- System.out.print(cell + " ");
+// System.out.print(cell + " ");
for (int i = 0; i < (30 - len); ++i) {
- System.out.print(" ");
+// System.out.print(" ");
}
}
- System.out.println();
+// System.out.println();
}
b.release();
batchLoader.clear();
}
- assertEquals(100, count);
+ assertEquals(100000, count);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
index 55b3f7d..c954827 100644
--- a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
+++ b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
@@ -12,7 +12,7 @@
pop:"mock-scan",
url: "http://apache.org",
entries:[
- {records: 100, types: [
+ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "OPTIONAL"},
{name: "red", type: "BIGINT", mode: "OPTIONAL"},
{name: "green", type: "BIGINT", mode: "OPTIONAL"}
http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json b/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
index 50a66f5..5a0166a 100644
--- a/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
+++ b/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
@@ -12,12 +12,12 @@
pop:"mock-scan",
url: "http://apache.org",
entries:[
- {records: 100, types: [
+ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "OPTIONAL"},
{name: "red", type: "BIGINT", mode: "OPTIONAL"},
{name: "green", type: "BIGINT", mode: "OPTIONAL"}
]},
- {records: 100, types: [
+ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "OPTIONAL"},
{name: "red", type: "BIGINT", mode: "OPTIONAL"},
{name: "green", type: "BIGINT", mode: "OPTIONAL"}
http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json b/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
index a02fe3f..b32de5c 100644
--- a/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
+++ b/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
@@ -12,27 +12,27 @@
pop:"mock-scan",
url: "http://apache.org",
entries:[
- {records: 100, types: [
+ {records: 100000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"}
]},
- {records: 90, types: [
+ {records: 90000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"}
]},
- {records: 80, types: [
+ {records: 80000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"}
]},
- {records: 70, types: [
+ {records: 70000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"}
]},
- {records: 60, types: [
+ {records: 60000, types: [
{name: "blue", type: "BIGINT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "BIGINT", mode: "REQUIRED"}