You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/11/22 22:52:08 UTC

[06/12] drill git commit: DRILL-5936: Refactor MergingRecordBatch based on code inspection

DRILL-5936: Refactor MergingRecordBatch based on code inspection

This closes #1025


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36abdd79
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36abdd79
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36abdd79

Branch: refs/heads/master
Commit: 36abdd79fe57a596dc1c508306acefdd6b3163ea
Parents: 23e6565
Author: Vlad Rozov <vr...@apache.org>
Authored: Mon Nov 6 17:55:56 2017 -0800
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 22 10:35:06 2017 -0800

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  | 41 +++++++++-----------
 1 file changed, 18 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/36abdd79/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a7d3f39..f9ceff2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -101,7 +101,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private MergingReceiverGeneratorBase merger;
   private final MergingReceiverPOP config;
   private boolean hasRun = false;
-  private boolean prevBatchWasFull = false;
+  private boolean outgoingBatchHasSpace = true;
   private boolean hasMoreIncoming = true;
 
   private int outgoingPosition = 0;
@@ -177,11 +177,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
     boolean schemaChanged = false;
 
-    if (prevBatchWasFull) {
+    if (!outgoingBatchHasSpace) {
       logger.debug("Outgoing vectors were full on last iteration");
       allocateOutgoing();
       outgoingPosition = 0;
-      prevBatchWasFull = false;
+      outgoingBatchHasSpace = true;
     }
 
     if (!hasMoreIncoming) {
@@ -398,14 +398,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       // finished lazy initialization
     }
 
-    while (!pqueue.isEmpty()) {
-      // pop next value from pq and copy to outgoing batch
-      final Node node = pqueue.peek();
-      if (!copyRecordToOutgoingBatch(node)) {
-        logger.debug("Outgoing vectors space is full; breaking");
-        prevBatchWasFull = true;
+    while (outgoingBatchHasSpace) {
+      // poll next value from pq and copy to outgoing batch
+      final Node node = pqueue.poll();
+      if (node == null) {
+        break;
       }
-      pqueue.poll();
+      outgoingBatchHasSpace = copyRecordToOutgoingBatch(node);
 
       if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
         // reached the end of an incoming record batch
@@ -448,11 +447,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
           // this batch is empty; since the pqueue no longer references this batch, it will be
           // ignored in subsequent iterations.
-          if (prevBatchWasFull) {
-            break;
-          } else {
-            continue;
-          }
+          continue;
         }
 
         final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
@@ -469,15 +464,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
         // add front value from batch[x] to priority queue
         if (batchLoaders[node.batchId].getRecordCount() != 0) {
-          pqueue.add(new Node(node.batchId, 0));
+          node.valueIndex = 0;
+          pqueue.add(node);
         }
 
       } else {
-        pqueue.add(new Node(node.batchId, node.valueIndex + 1));
-      }
-
-      if (prevBatchWasFull) {
-        break;
+        node.valueIndex++;
+        pqueue.add(node);
       }
 
     }
@@ -786,6 +779,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
    * @param node Reference to the next record to copy from the incoming batches
    */
   private boolean copyRecordToOutgoingBatch(final Node node) {
+    assert outgoingPosition < OUTGOING_BATCH_SIZE
+        : String.format("Outgoing position %d must be less than bath size %d", outgoingPosition, OUTGOING_BATCH_SIZE);
     assert ++outputCounts[node.batchId] <= inputCounts[node.batchId]
         : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
     final int inIndex = (node.batchId << 16) + node.valueIndex;
@@ -794,8 +789,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     } catch (SchemaChangeException e) {
       throw new UnsupportedOperationException(e);
     }
-    outgoingPosition++;
-    if (outgoingPosition == OUTGOING_BATCH_SIZE) {
+    if (++outgoingPosition == OUTGOING_BATCH_SIZE) {
+      logger.debug("Outgoing vectors space is full (batch size {}).", OUTGOING_BATCH_SIZE);
       return false;
     }
     return true;