You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/06/29 00:44:43 UTC

[drill] 01/02: DRILL-6512: Remove unnecessary processing overhead from RecordBatchSizer

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 02255d8573497cb88e19624ea0054d0121498646
Author: Padma Penumarthy <pp...@yahoo.com>
AuthorDate: Mon Jun 25 19:06:51 2018 -0700

    DRILL-6512: Remove unnecessary processing overhead from RecordBatchSizer
    
    closes #1341
---
 .../physical/impl/aggregate/HashAggTemplate.java   |   4 +-
 .../exec/physical/impl/common/HashPartition.java   |   2 +-
 .../physical/impl/common/HashTableTemplate.java    |   2 +-
 .../physical/impl/flatten/FlattenRecordBatch.java  |   2 +-
 .../exec/physical/impl/xsort/managed/SortImpl.java |  10 +-
 .../drill/exec/record/JoinBatchMemoryManager.java  |   7 +-
 .../exec/record/RecordBatchMemoryManager.java      |  12 ++-
 .../apache/drill/exec/record/RecordBatchSizer.java | 120 ++++++++++++---------
 .../exec/physical/unit/TestOutputBatchSize.java    |   2 +-
 .../org/apache/drill/test/DrillTestWrapper.java    |   2 +-
 .../drill/test/rowSet/AbstractSingleRowSet.java    |   2 +-
 .../apache/drill/test/rowSet/IndirectRowSet.java   |   2 +-
 12 files changed, 96 insertions(+), 71 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 320f296..3b50471 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -517,10 +517,10 @@ public abstract class HashAggTemplate implements HashAggregator {
     logger.trace("Incoming sizer: {}",sizer);
     // An empty batch only has the schema, can not tell actual length of varchars
     // else use the actual varchars length, each capped at 50 (to match the space allocation)
-    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
+    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.getStdRowWidth() : sizer.getNetRowWidthCap50();
 
     // Get approx max (varchar) column width to get better memory allocation
-    maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+    maxColumnWidth = Math.max(sizer.getMaxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
     maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
 
     //
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index e525530..d80237c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -249,7 +249,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
       tmpBatchesList.add(currentBatch);
       partitionBatchesCount++;
 
-      long batchSize = new RecordBatchSizer(currentBatch).actualSize();
+      long batchSize = new RecordBatchSizer(currentBatch).getActualSize();
       inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
 
       partitionInMemorySize += batchSize;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index bb0b1ad..6c9a398 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -438,7 +438,7 @@ public abstract class HashTableTemplate implements HashTable {
         size += ledger.getAccountedSize();
       }
 
-      size += new RecordBatchSizer(htContainer).actualSize();
+      size += new RecordBatchSizer(htContainer).getActualSize();
       return size;
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 2f92d52..8421d6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -136,7 +136,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
 
       // Average rowWidth excluding the flatten column.
-      final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().netRowWidth() - avgRowWidthFlattenColumn;
+      final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn;
 
       // Average rowWidth of single element in the flatten list.
       // subtract the offset vector size from column data size.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 55a20bd..03fb751 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -298,14 +298,14 @@ public class SortImpl {
     // during the transfer, we immediately follow the transfer with an SV2
     // allocation that will fail if we are over the allocation limit.
 
-    if (isSpillNeeded(sizer.actualSize())) {
+    if (isSpillNeeded(sizer.getActualSize())) {
       spillFromMemory();
     }
 
     // Sanity check. We should now be below the buffer memory maximum.
 
     long startMem = allocator.getAllocatedMemory();
-    bufferedBatches.add(incoming, sizer.netSize());
+    bufferedBatches.add(incoming, sizer.getNetBatchSize());
 
     // Compute batch size, including allocation of an sv2.
 
@@ -314,7 +314,7 @@ public class SortImpl {
 
     // Update the minimum buffer space metric.
 
-    metrics.updateInputMetrics(sizer.rowCount(), sizer.actualSize());
+    metrics.updateInputMetrics(sizer.rowCount(), sizer.getActualSize());
     metrics.updateMemory(memManager.freeMemory(endMem));
     metrics.updatePeakBatches(bufferedBatches.size());
 
@@ -322,8 +322,8 @@ public class SortImpl {
     // the effective count as given by the selection vector
     // (which may exclude some records due to filtering.)
 
-    validateBatchSize(sizer.actualSize(), batchSize);
-    if (memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount())) {
+    validateBatchSize(sizer.getActualSize(), batchSize);
+    if (memManager.updateEstimates((int) batchSize, sizer.getNetRowWidth(), sizer.rowCount())) {
 
       // If estimates changed, discard the helper based on the old estimates.
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 16b06fe..2ebe887 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -37,13 +37,14 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
 
   private int updateInternal(int inputIndex, int outputPosition,  boolean useAggregate) {
     updateIncomingStats(inputIndex);
-    rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocSize();
+    rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
 
     final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
 
-    // If outgoing row width is 0, just return. This is possible for empty batches or
+    // If outgoing row width is 0 or there is no change in outgoing row width, just return.
+    // This is possible for empty batches or
     // when first set of batches come with OK_NEW_SCHEMA and no data.
-    if (newOutgoingRowWidth == 0) {
+    if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) {
       return getOutputRowCount();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 993f3cb..a270ced 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -157,9 +157,9 @@ public class RecordBatchMemoryManager {
   public void update(RecordBatch recordBatch, int index) {
     // Get sizing information for the batch.
     setRecordBatchSizer(index, new RecordBatchSizer(recordBatch));
-    setOutgoingRowWidth(getRecordBatchSizer(index).netRowWidth());
+    setOutgoingRowWidth(getRecordBatchSizer(index).getNetRowWidth());
     // Number of rows in outgoing batch
-    setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).netRowWidth());
+    setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).getNetRowWidth());
     updateIncomingStats(index);
   }
 
@@ -201,6 +201,10 @@ public class RecordBatchMemoryManager {
     return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS)));
   }
 
+  public static int computeRowCount(int batchSize, int rowWidth) {
+    return adjustOutputRowCount(RecordBatchSizer.safeDivide(batchSize, rowWidth));
+  }
+
   public void setOutgoingRowWidth(int outgoingRowWidth) {
     this.outgoingRowWidth = outgoingRowWidth;
   }
@@ -249,13 +253,13 @@ public class RecordBatchMemoryManager {
     Preconditions.checkArgument(index >= 0 && index < numInputs);
     Preconditions.checkArgument(inputBatchStats[index] != null);
     inputBatchStats[index].incNumBatches();
-    inputBatchStats[index].incSumBatchSizes(sizer[index].netSize());
+    inputBatchStats[index].incSumBatchSizes(sizer[index].getNetBatchSize());
     inputBatchStats[index].incTotalRecords(sizer[index].rowCount());
   }
 
   public void updateIncomingStats() {
     inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches();
-    inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].netSize());
+    inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].getNetBatchSize());
     inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount());
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 4b8ae80..83287ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.memory.BaseAllocator;
-import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVector;
@@ -53,11 +52,6 @@ import static org.apache.drill.exec.vector.AllocationHelper.STD_REPETITION_FACTO
  */
 
 public class RecordBatchSizer {
-  // TODO consolidate common memory estimation helpers
-  public static final double PAYLOAD_FROM_BUFFER = SortMemoryManager.PAYLOAD_FROM_BUFFER;
-  public static final double FRAGMENTATION_FACTOR = 1.0 / PAYLOAD_FROM_BUFFER;
-  public static final double BUFFER_FROM_PAYLOAD = SortMemoryManager.BUFFER_FROM_PAYLOAD;
-
   private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH;
   private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH;
 
@@ -644,12 +638,6 @@ public class RecordBatchSizer {
    */
   private int rowCount;
   /**
-   * Standard row width using Drill meta-data. Note: this information is
-   * 100% bogus. Do not use it.
-   */
-  @Deprecated
-  private int stdRowWidth;
-  /**
    * Actual batch size summing all buffers used to store data
    * for the batch.
    */
@@ -669,9 +657,12 @@ public class RecordBatchSizer {
   /**
    * actual row size if input is not empty. Otherwise, standard size.
    */
-  private int rowAllocSize;
-  private boolean hasSv2;
+  private int rowAllocWidth;
+  private int stdRowWidth;
+
+  public SelectionVector2 sv2 = null;
   private int sv2Size;
+
   private int avgDensity;
 
   private Set<BufferLedger> ledgers = Sets.newIdentityHashSet();
@@ -724,47 +715,24 @@ public class RecordBatchSizer {
     for (VectorWrapper<?> vw : va) {
       ColumnSize colSize = measureColumn(vw.getValueVector(), "");
       columnSizes.put(vw.getField().getName(), colSize);
-      stdRowWidth += colSize.getStdDataSizePerEntry();
       netBatchSize += colSize.getTotalNetSize();
       maxSize = Math.max(maxSize, colSize.getTotalDataSize());
       if (colSize.metadata.isNullable()) {
         nullableCount++;
       }
       netRowWidth += colSize.getNetSizePerEntry();
-      rowAllocSize += colSize.getAllocSizePerEntry();
-    }
-
-    for (BufferLedger ledger : ledgers) {
-      accountedMemorySize += ledger.getAccountedSize();
-    }
-
-    if (rowCount > 0) {
-      grossRowWidth = safeDivide(accountedMemorySize, rowCount);
-    }
-
-    if (sv2 != null) {
-      sv2Size = sv2.getBuffer(false).capacity();
-      accountedMemorySize += sv2Size;
-      hasSv2 = true;
     }
-
-    computeEstimates();
-  }
-
-  private void computeEstimates() {
-    grossRowWidth = safeDivide(accountedMemorySize, rowCount);
-    avgDensity = safeDivide(netBatchSize * 100L, accountedMemorySize);
+    this.sv2 = sv2;
   }
 
   public void applySv2() {
-    if (hasSv2) {
+    if (sv2 == null) {
       return;
     }
 
-    hasSv2 = true;
     sv2Size = BaseAllocator.nextPowerOfTwo(2 * rowCount);
+    avgDensity = safeDivide(netBatchSize * 100L, getActualSize());
     accountedMemorySize += sv2Size;
-    computeEstimates();
   }
 
   /**
@@ -856,10 +824,64 @@ public class RecordBatchSizer {
   }
 
   public int rowCount() { return rowCount; }
-  public int stdRowWidth() { return stdRowWidth; }
-  public int grossRowWidth() { return grossRowWidth; }
-  public int netRowWidth() { return netRowWidth; }
-  public int getRowAllocSize() { return rowAllocSize; }
+
+  public int getStdRowWidth() {
+    if (stdRowWidth != 0) {
+      return stdRowWidth;
+    }
+
+    for (ColumnSize columnSize : columnSizes.values()) {
+      stdRowWidth += columnSize.getStdDataSizePerEntry();
+    }
+
+    return stdRowWidth;
+  }
+
+  public int getRowAllocWidth() {
+    if (rowAllocWidth != 0) {
+      return rowAllocWidth;
+    }
+
+    for (ColumnSize columnSize : columnSizes.values()) {
+      rowAllocWidth += columnSize.getAllocSizePerEntry();
+    }
+
+    return rowAllocWidth;
+  }
+
+  public long getActualSize() {
+    if (accountedMemorySize != 0) {
+      return accountedMemorySize;
+    }
+
+    for (BufferLedger ledger : ledgers) {
+      accountedMemorySize += ledger.getAccountedSize();
+    }
+
+    if (sv2 != null) {
+      sv2Size = sv2.getBuffer(false).capacity();
+      accountedMemorySize += sv2Size;
+    }
+
+    return accountedMemorySize;
+  }
+
+  public int getGrossRowWidth() {
+    if (grossRowWidth != 0) {
+      return grossRowWidth;
+    }
+
+    grossRowWidth = safeDivide(getActualSize(), rowCount);
+
+    return grossRowWidth;
+  }
+
+  public int getAvgDensity() {
+    return safeDivide(netBatchSize * 100L, getActualSize());
+  }
+
+
+  public int getNetRowWidth() { return netRowWidth; }
   public Map<String, ColumnSize> columns() { return columnSizes; }
 
   /**
@@ -868,12 +890,10 @@ public class RecordBatchSizer {
    * and null marking columns.
    * @return "real" width of the row
    */
-  public int netRowWidthCap50() { return netRowWidthCap50 + nullableCount; }
-  public long actualSize() { return accountedMemorySize; }
-  public boolean hasSv2() { return hasSv2; }
-  public int avgDensity() { return avgDensity; }
-  public long netSize() { return netBatchSize; }
-  public int maxAvgColumnSize() { return maxSize / rowCount; }
+  public int getNetRowWidthCap50() { return netRowWidthCap50 + nullableCount; }
+  public boolean hasSv2() { return sv2 != null; }
+  public long getNetBatchSize() { return netBatchSize; }
+  public int getMaxAvgColumnSize() { return safeDivide(maxSize, rowCount); }
 
   @Override
   public String toString() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index a029832..fd0b494 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -73,7 +73,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     long totalSize = 0;
     for (VectorAccessible batch : batches) {
       RecordBatchSizer sizer = new RecordBatchSizer(batch);
-      totalSize += sizer.netSize();
+      totalSize += sizer.getNetBatchSize();
     }
     return totalSize;
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 051f4b3..e037d02 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -373,7 +373,7 @@ public class DrillTestWrapper {
         RecordBatchSizer sizer = new RecordBatchSizer(loader);
         // Not checking actualSize as accounting is not correct when we do
         // split and transfer ownership across operators.
-        Assert.assertTrue(sizer.netSize() <= expectedBatchSize);
+        Assert.assertTrue(sizer.getNetBatchSize() <= expectedBatchSize);
       }
 
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
index 71ca3cf..297a1c5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
@@ -52,7 +52,7 @@ public abstract class AbstractSingleRowSet extends AbstractRowSet implements Sin
   @Override
   public long size() {
     RecordBatchSizer sizer = new RecordBatchSizer(container());
-    return sizer.actualSize();
+    return sizer.getActualSize();
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index a2bc5e8..f0ebdc0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -139,6 +139,6 @@ public class IndirectRowSet extends AbstractSingleRowSet {
   @Override
   public long size() {
     RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2);
-    return sizer.actualSize();
+    return sizer.getActualSize();
   }
 }