You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/06/29 00:44:57 UTC

[GitHub] Ben-Zvi closed pull request #1341: DRILL-6512: Remove unnecessary processing overhead from RecordBatchSizer

Ben-Zvi closed pull request #1341: DRILL-6512: Remove unnecessary processing overhead from RecordBatchSizer
URL: https://github.com/apache/drill/pull/1341
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 258e8d0856..d0b036dffd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -516,10 +516,10 @@ private void updateEstMaxBatchSize(RecordBatch incoming) {
     logger.trace("Incoming sizer: {}",sizer);
     // An empty batch only has the schema, can not tell actual length of varchars
     // else use the actual varchars length, each capped at 50 (to match the space allocation)
-    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
+    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.getStdRowWidth() : sizer.getNetRowWidthCap50();
 
     // Get approx max (varchar) column width to get better memory allocation
-    maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+    maxColumnWidth = Math.max(sizer.getMaxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
     maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
 
     //
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index e525530b31..d80237c1e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -249,7 +249,7 @@ private void completeABatch(boolean toInitialize, boolean needsSpill) {
       tmpBatchesList.add(currentBatch);
       partitionBatchesCount++;
 
-      long batchSize = new RecordBatchSizer(currentBatch).actualSize();
+      long batchSize = new RecordBatchSizer(currentBatch).getActualSize();
       inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
 
       partitionInMemorySize += batchSize;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index bb0b1adf2a..6c9a39802d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -438,7 +438,7 @@ public long getActualSize() {
         size += ledger.getAccountedSize();
       }
 
-      size += new RecordBatchSizer(htContainer).actualSize();
+      size += new RecordBatchSizer(htContainer).getActualSize();
       return size;
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 2f92d52725..8421d6ee09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -136,7 +136,7 @@ public void update() {
       final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
 
       // Average rowWidth excluding the flatten column.
-      final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().netRowWidth() - avgRowWidthFlattenColumn;
+      final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn;
 
       // Average rowWidth of single element in the flatten list.
       // subtract the offset vector size from column data size.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 23ace36415..735306a4ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -260,14 +260,14 @@ public void addBatch(VectorAccessible incoming) {
     // during the transfer, we immediately follow the transfer with an SV2
     // allocation that will fail if we are over the allocation limit.
 
-    if (isSpillNeeded(sizer.actualSize())) {
+    if (isSpillNeeded(sizer.getActualSize())) {
       spillFromMemory();
     }
 
     // Sanity check. We should now be below the buffer memory maximum.
 
     long startMem = allocator.getAllocatedMemory();
-    bufferedBatches.add(incoming, sizer.netSize());
+    bufferedBatches.add(incoming, sizer.getNetBatchSize());
 
     // Compute batch size, including allocation of an sv2.
 
@@ -276,7 +276,7 @@ public void addBatch(VectorAccessible incoming) {
 
     // Update the minimum buffer space metric.
 
-    metrics.updateInputMetrics(sizer.rowCount(), sizer.actualSize());
+    metrics.updateInputMetrics(sizer.rowCount(), sizer.getActualSize());
     metrics.updateMemory(memManager.freeMemory(endMem));
     metrics.updatePeakBatches(bufferedBatches.size());
 
@@ -284,8 +284,8 @@ public void addBatch(VectorAccessible incoming) {
     // the effective count as given by the selection vector
     // (which may exclude some records due to filtering.)
 
-    validateBatchSize(sizer.actualSize(), batchSize);
-    if (memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount())) {
+    validateBatchSize(sizer.getActualSize(), batchSize);
+    if (memManager.updateEstimates((int) batchSize, sizer.getNetRowWidth(), sizer.rowCount())) {
 
       // If estimates changed, discard the helper based on the old estimates.
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 16b06fee84..2ebe887830 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -37,13 +37,14 @@ public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, Record
 
   private int updateInternal(int inputIndex, int outputPosition,  boolean useAggregate) {
     updateIncomingStats(inputIndex);
-    rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocSize();
+    rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
 
     final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
 
-    // If outgoing row width is 0, just return. This is possible for empty batches or
+    // If outgoing row width is 0 or there is no change in outgoing row width, just return.
+    // This is possible for empty batches or
     // when first set of batches come with OK_NEW_SCHEMA and no data.
-    if (newOutgoingRowWidth == 0) {
+    if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) {
       return getOutputRowCount();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 993f3cb63d..a270ced48c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -157,9 +157,9 @@ public void update(int inputIndex) {
   public void update(RecordBatch recordBatch, int index) {
     // Get sizing information for the batch.
     setRecordBatchSizer(index, new RecordBatchSizer(recordBatch));
-    setOutgoingRowWidth(getRecordBatchSizer(index).netRowWidth());
+    setOutgoingRowWidth(getRecordBatchSizer(index).getNetRowWidth());
     // Number of rows in outgoing batch
-    setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).netRowWidth());
+    setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).getNetRowWidth());
     updateIncomingStats(index);
   }
 
@@ -201,6 +201,10 @@ public static int adjustOutputRowCount(int rowCount) {
     return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS)));
   }
 
+  public static int computeRowCount(int batchSize, int rowWidth) {
+    return adjustOutputRowCount(RecordBatchSizer.safeDivide(batchSize, rowWidth));
+  }
+
   public void setOutgoingRowWidth(int outgoingRowWidth) {
     this.outgoingRowWidth = outgoingRowWidth;
   }
@@ -249,13 +253,13 @@ public void updateIncomingStats(int index) {
     Preconditions.checkArgument(index >= 0 && index < numInputs);
     Preconditions.checkArgument(inputBatchStats[index] != null);
     inputBatchStats[index].incNumBatches();
-    inputBatchStats[index].incSumBatchSizes(sizer[index].netSize());
+    inputBatchStats[index].incSumBatchSizes(sizer[index].getNetBatchSize());
     inputBatchStats[index].incTotalRecords(sizer[index].rowCount());
   }
 
   public void updateIncomingStats() {
     inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches();
-    inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].netSize());
+    inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].getNetBatchSize());
     inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount());
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index a5cb05b008..b6a294ef6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -26,7 +26,6 @@
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.memory.BaseAllocator;
-import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVector;
@@ -50,11 +49,6 @@
  */
 
 public class RecordBatchSizer {
-  // TODO consolidate common memory estimation helpers
-  public static final double PAYLOAD_FROM_BUFFER = SortMemoryManager.PAYLOAD_FROM_BUFFER;
-  public static final double FRAGMENTATION_FACTOR = 1.0 / PAYLOAD_FROM_BUFFER;
-  public static final double BUFFER_FROM_PAYLOAD = SortMemoryManager.BUFFER_FROM_PAYLOAD;
-
   private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH;
   private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH;
 
@@ -610,12 +604,6 @@ public ColumnSize getColumn(String name) {
    * Number of records (rows) in the batch.
    */
   private int rowCount;
-  /**
-   * Standard row width using Drill meta-data. Note: this information is
-   * 100% bogus. Do not use it.
-   */
-  @Deprecated
-  private int stdRowWidth;
   /**
    * Actual batch size summing all buffers used to store data
    * for the batch.
@@ -636,9 +624,12 @@ public ColumnSize getColumn(String name) {
   /**
    * actual row size if input is not empty. Otherwise, standard size.
    */
-  private int rowAllocSize;
-  private boolean hasSv2;
+  private int rowAllocWidth;
+  private int stdRowWidth;
+
+  public SelectionVector2 sv2 = null;
   private int sv2Size;
+
   private int avgDensity;
 
   private Set<BufferLedger> ledgers = Sets.newIdentityHashSet();
@@ -691,47 +682,24 @@ public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     for (VectorWrapper<?> vw : va) {
       ColumnSize colSize = measureColumn(vw.getValueVector(), "");
       columnSizes.put(vw.getField().getName(), colSize);
-      stdRowWidth += colSize.getStdDataSizePerEntry();
       netBatchSize += colSize.getTotalNetSize();
       maxSize = Math.max(maxSize, colSize.getTotalDataSize());
       if (colSize.metadata.isNullable()) {
         nullableCount++;
       }
       netRowWidth += colSize.getNetSizePerEntry();
-      rowAllocSize += colSize.getAllocSizePerEntry();
-    }
-
-    for (BufferLedger ledger : ledgers) {
-      accountedMemorySize += ledger.getAccountedSize();
-    }
-
-    if (rowCount > 0) {
-      grossRowWidth = safeDivide(accountedMemorySize, rowCount);
-    }
-
-    if (sv2 != null) {
-      sv2Size = sv2.getBuffer(false).capacity();
-      accountedMemorySize += sv2Size;
-      hasSv2 = true;
     }
-
-    computeEstimates();
-  }
-
-  private void computeEstimates() {
-    grossRowWidth = safeDivide(accountedMemorySize, rowCount);
-    avgDensity = safeDivide(netBatchSize * 100L, accountedMemorySize);
+    this.sv2 = sv2;
   }
 
   public void applySv2() {
-    if (hasSv2) {
+    if (sv2 == null) {
       return;
     }
 
-    hasSv2 = true;
     sv2Size = BaseAllocator.nextPowerOfTwo(2 * rowCount);
+    avgDensity = safeDivide(netBatchSize * 100L, getActualSize());
     accountedMemorySize += sv2Size;
-    computeEstimates();
   }
 
   /**
@@ -823,10 +791,64 @@ public static int safeDivide(int num, double denom) {
   }
 
   public int rowCount() { return rowCount; }
-  public int stdRowWidth() { return stdRowWidth; }
-  public int grossRowWidth() { return grossRowWidth; }
-  public int netRowWidth() { return netRowWidth; }
-  public int getRowAllocSize() { return rowAllocSize; }
+
+  public int getStdRowWidth() {
+    if (stdRowWidth != 0) {
+      return stdRowWidth;
+    }
+
+    for (ColumnSize columnSize : columnSizes.values()) {
+      stdRowWidth += columnSize.getStdDataSizePerEntry();
+    }
+
+    return stdRowWidth;
+  }
+
+  public int getRowAllocWidth() {
+    if (rowAllocWidth != 0) {
+      return rowAllocWidth;
+    }
+
+    for (ColumnSize columnSize : columnSizes.values()) {
+      rowAllocWidth += columnSize.getAllocSizePerEntry();
+    }
+
+    return rowAllocWidth;
+  }
+
+  public long getActualSize() {
+    if (accountedMemorySize != 0) {
+      return accountedMemorySize;
+    }
+
+    for (BufferLedger ledger : ledgers) {
+      accountedMemorySize += ledger.getAccountedSize();
+    }
+
+    if (sv2 != null) {
+      sv2Size = sv2.getBuffer(false).capacity();
+      accountedMemorySize += sv2Size;
+    }
+
+    return accountedMemorySize;
+  }
+
+  public int getGrossRowWidth() {
+    if (grossRowWidth != 0) {
+      return grossRowWidth;
+    }
+
+    grossRowWidth = safeDivide(getActualSize(), rowCount);
+
+    return grossRowWidth;
+  }
+
+  public int getAvgDensity() {
+    return safeDivide(netBatchSize * 100L, getActualSize());
+  }
+
+
+  public int getNetRowWidth() { return netRowWidth; }
   public Map<String, ColumnSize> columns() { return columnSizes; }
 
   /**
@@ -835,12 +857,10 @@ public static int safeDivide(int num, double denom) {
    * and null marking columns.
    * @return "real" width of the row
    */
-  public int netRowWidthCap50() { return netRowWidthCap50 + nullableCount; }
-  public long actualSize() { return accountedMemorySize; }
-  public boolean hasSv2() { return hasSv2; }
-  public int avgDensity() { return avgDensity; }
-  public long netSize() { return netBatchSize; }
-  public int maxAvgColumnSize() { return maxSize / rowCount; }
+  public int getNetRowWidthCap50() { return netRowWidthCap50 + nullableCount; }
+  public boolean hasSv2() { return sv2 != null; }
+  public long getNetBatchSize() { return netBatchSize; }
+  public int getMaxAvgColumnSize() { return safeDivide(maxSize, rowCount); }
 
   @Override
   public String toString() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index da83b00446..02980ad6c9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -71,7 +71,7 @@ private long getExpectedSize(List<String> expectedJsonBatches) throws ExecutionS
     long totalSize = 0;
     for (VectorAccessible batch : batches) {
       RecordBatchSizer sizer = new RecordBatchSizer(batch);
-      totalSize += sizer.netSize();
+      totalSize += sizer.getNetBatchSize();
     }
     return totalSize;
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 051f4b3ba7..e037d02711 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -373,7 +373,7 @@ public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches,
         RecordBatchSizer sizer = new RecordBatchSizer(loader);
         // Not checking actualSize as accounting is not correct when we do
         // split and transfer ownership across operators.
-        Assert.assertTrue(sizer.netSize() <= expectedBatchSize);
+        Assert.assertTrue(sizer.getNetBatchSize() <= expectedBatchSize);
       }
 
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
index 71ca3cf024..297a1c5472 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
@@ -52,7 +52,7 @@ public AbstractSingleRowSet(VectorContainer container, TupleMetadata schema) {
   @Override
   public long size() {
     RecordBatchSizer sizer = new RecordBatchSizer(container());
-    return sizer.actualSize();
+    return sizer.getActualSize();
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index a2bc5e8741..f0ebdc073b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -139,6 +139,6 @@ public SingleRowSet toIndirect(Set<Integer> skipIndices) {
   @Override
   public long size() {
     RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2);
-    return sizer.actualSize();
+    return sizer.getActualSize();
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services