You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/04/28 04:58:32 UTC

[5/5] drill git commit: DRILL-2083: Fix bug in merging receiver

DRILL-2083: Fix bug in merging receiver


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/57a96d20
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/57a96d20
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/57a96d20

Branch: refs/heads/master
Commit: 57a96d200e12c0efcad3f3ca9d935c42647234b1
Parents: 6878bfd
Author: Steven Phillips <sm...@apache.org>
Authored: Mon Apr 20 15:12:57 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon Apr 27 14:12:13 2015 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  | 35 ++++++++++++++------
 .../impl/mergereceiver/TestMergingReceiver.java | 28 ++++++++--------
 .../test/resources/mergerecv/empty_batch.json   |  2 +-
 .../resources/mergerecv/merging_receiver.json   |  4 +--
 .../resources/mergerecv/multiple_providers.json | 10 +++---
 5 files changed, 46 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 3cf2857..40cbc89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -112,7 +112,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private int[] batchOffsets;
   private PriorityQueue <Node> pqueue;
   private RawFragmentBatch emptyBatch = null;
-  private RawFragmentBatch[] tempBatchHolder; //
+  private RawFragmentBatch[] tempBatchHolder;
+  private long[] inputCounts;
+  private long[] outputCounts;
 
   public static enum Metric implements MetricDef{
     BYTES_RECEIVED,
@@ -135,15 +137,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     this.outgoingContainer = new VectorContainer(oContext);
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
+    this.inputCounts = new long[config.getNumSenders()];
+    this.outputCounts = new long[config.getNumSenders()];
   }
 
-  private RawFragmentBatch getNext(final RawFragmentBatchProvider provider) throws IOException{
+  private RawFragmentBatch getNext(final int providerIndex) throws IOException{
     stats.startWait();
+    final RawFragmentBatchProvider provider = fragProviders[providerIndex];
     try {
       final RawFragmentBatch b = provider.getNext();
       if (b != null) {
         stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
         stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+        inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
       }
       return b;
     } finally {
@@ -186,9 +192,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             rawBatch = tempBatchHolder[p];
             tempBatchHolder[p] = null;
           } else {
-            rawBatch = getNext(provider);
+            rawBatch = getNext(p);
           }
-          p++;
           if (rawBatch == null && !context.shouldContinue()) {
             return IterOutcome.STOP;
           }
@@ -204,7 +209,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             emptyBatch = rawBatch;
           }
           try {
-            while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
+            while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
               ;
             }
             if (rawBatch == null && !context.shouldContinue()) {
@@ -220,6 +225,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             rawBatches.add(emptyBatch);
           }
         }
+        p++;
       }
 
       // allocate the incoming record batch loaders
@@ -304,7 +310,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       for (int b = 0; b < senderCount; ++b) {
         while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
           try {
-            final RawFragmentBatch batch = getNext(fragProviders[b]);
+            final RawFragmentBatch batch = getNext(b);
             incomingBatches[b] = batch;
             if (batch != null) {
               batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
@@ -335,7 +341,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       if (!copyRecordToOutgoingBatch(node)) {
         logger.debug("Outgoing vectors space is full; breaking");
         prevBatchWasFull = true;
-        break;
       }
       pqueue.poll();
 
@@ -349,11 +354,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         // reached the end of an incoming record batch
         RawFragmentBatch nextBatch = null;
         try {
-          nextBatch = getNext(fragProviders[node.batchId]);
+          nextBatch = getNext(node.batchId);
 
           while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
-            nextBatch = getNext(fragProviders[node.batchId]);
+            nextBatch = getNext(node.batchId);
           }
+          assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
+              : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
           if (nextBatch == null && !context.shouldContinue()) {
             return IterOutcome.STOP;
           }
@@ -383,7 +390,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
           // this batch is empty; since the pqueue no longer references this batch, it will be
           // ignored in subsequent iterations.
-          continue;
+          if (prevBatchWasFull) {
+            break;
+          } else {
+            continue;
+          }
         }
 
         final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
@@ -447,7 +458,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           state = BatchState.DONE;
           return;
         }
-        final RawFragmentBatch batch = getNext(fragProviders[i]);
+        final RawFragmentBatch batch = getNext(i);
         if (batch.getHeader().getDef().getFieldCount() == 0) {
           i++;
           continue;
@@ -661,6 +672,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
    * @param node Reference to the next record to copy from the incoming batches
    */
   private boolean copyRecordToOutgoingBatch(final Node node) {
+    assert ++outputCounts[node.batchId] <= inputCounts[node.batchId]
+        : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
     final int inIndex = (node.batchId << 16) + node.valueIndex;
     merger.doCopy(inIndex, outgoingPosition);
     outgoingPosition++;

http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
index cf9dd84..0122c08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
@@ -66,21 +66,21 @@ public class TestMergingReceiver extends PopUnitTestBase {
           }
           for (Object cell : row) {
             if (cell == null) {
-              System.out.print("<null>    ");
+//              System.out.print("<null>    ");
               continue;
             }
             int len = cell.toString().length();
-            System.out.print(cell + " ");
+//            System.out.print(cell + " ");
             for (int i = 0; i < (30 - len); ++i) {
-              System.out.print(" ");
+//              System.out.print(" ");
             }
           }
-          System.out.println();
+//          System.out.println();
         }
         b.release();
         batchLoader.clear();
       }
-      assertEquals(200, count);
+      assertEquals(200000, count);
     }
   }
 
@@ -122,17 +122,17 @@ public class TestMergingReceiver extends PopUnitTestBase {
           }
           for (Object cell : row) {
             int len = cell.toString().length();
-            System.out.print(cell + " ");
+//            System.out.print(cell + " ");
             for (int i = 0; i < (30 - len); ++i) {
-              System.out.print(" ");
+//              System.out.print(" ");
             }
           }
-          System.out.println();
+//          System.out.println();
         }
         b.release();
         batchLoader.clear();
       }
-      assertEquals(400, count);
+      assertEquals(400000, count);
     }
   }
 
@@ -163,21 +163,21 @@ public class TestMergingReceiver extends PopUnitTestBase {
           }
           for (Object cell : row) {
             if (cell == null) {
-              System.out.print("<null>    ");
+//              System.out.print("<null>    ");
               continue;
             }
             int len = cell.toString().length();
-            System.out.print(cell + " ");
+//            System.out.print(cell + " ");
             for (int i = 0; i < (30 - len); ++i) {
-              System.out.print(" ");
+//              System.out.print(" ");
             }
           }
-          System.out.println();
+//          System.out.println();
         }
         b.release();
         batchLoader.clear();
       }
-      assertEquals(100, count);
+      assertEquals(100000, count);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
index 55b3f7d..c954827 100644
--- a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
+++ b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
@@ -12,7 +12,7 @@
       pop:"mock-scan",
       url: "http://apache.org",
       entries:[
-        {records: 100, types: [
+        {records: 100000, types: [
           {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
           {name: "red", type: "BIGINT", mode: "OPTIONAL"},
           {name: "green", type: "BIGINT", mode: "OPTIONAL"}

http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json b/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
index 50a66f5..5a0166a 100644
--- a/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
+++ b/exec/java-exec/src/test/resources/mergerecv/merging_receiver.json
@@ -12,12 +12,12 @@
       pop:"mock-scan",
       url: "http://apache.org",
       entries:[
-        {records: 100, types: [
+        {records: 100000, types: [
           {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
           {name: "red", type: "BIGINT", mode: "OPTIONAL"},
           {name: "green", type: "BIGINT", mode: "OPTIONAL"}
         ]},
-        {records: 100, types: [
+        {records: 100000, types: [
           {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
           {name: "red", type: "BIGINT", mode: "OPTIONAL"},
           {name: "green", type: "BIGINT", mode: "OPTIONAL"}

http://git-wip-us.apache.org/repos/asf/drill/blob/57a96d20/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json b/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
index a02fe3f..b32de5c 100644
--- a/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
+++ b/exec/java-exec/src/test/resources/mergerecv/multiple_providers.json
@@ -12,27 +12,27 @@
       pop:"mock-scan",
       url: "http://apache.org",
       entries:[
-        {records: 100, types: [
+        {records: 100000, types: [
           {name: "blue", type: "BIGINT", mode: "REQUIRED"},
           {name: "red", type: "BIGINT", mode: "REQUIRED"},
           {name: "green", type: "BIGINT", mode: "REQUIRED"}
         ]},
-        {records: 90, types: [
+        {records: 90000, types: [
           {name: "blue", type: "BIGINT", mode: "REQUIRED"},
           {name: "red", type: "BIGINT", mode: "REQUIRED"},
           {name: "green", type: "BIGINT", mode: "REQUIRED"}
         ]},
-        {records: 80, types: [
+        {records: 80000, types: [
           {name: "blue", type: "BIGINT", mode: "REQUIRED"},
           {name: "red", type: "BIGINT", mode: "REQUIRED"},
           {name: "green", type: "BIGINT", mode: "REQUIRED"}
         ]},
-        {records: 70, types: [
+        {records: 70000, types: [
           {name: "blue", type: "BIGINT", mode: "REQUIRED"},
           {name: "red", type: "BIGINT", mode: "REQUIRED"},
           {name: "green", type: "BIGINT", mode: "REQUIRED"}
         ]},
-        {records: 60, types: [
+        {records: 60000, types: [
           {name: "blue", type: "BIGINT", mode: "REQUIRED"},
           {name: "red", type: "BIGINT", mode: "REQUIRED"},
           {name: "green", type: "BIGINT", mode: "REQUIRED"}