You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/06/29 00:44:43 UTC
[drill] 01/02: DRILL-6512: Remove unnecessary processing overhead
from RecordBatchSizer
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 02255d8573497cb88e19624ea0054d0121498646
Author: Padma Penumarthy <pp...@yahoo.com>
AuthorDate: Mon Jun 25 19:06:51 2018 -0700
DRILL-6512: Remove unnecessary processing overhead from RecordBatchSizer
closes #1341
---
.../physical/impl/aggregate/HashAggTemplate.java | 4 +-
.../exec/physical/impl/common/HashPartition.java | 2 +-
.../physical/impl/common/HashTableTemplate.java | 2 +-
.../physical/impl/flatten/FlattenRecordBatch.java | 2 +-
.../exec/physical/impl/xsort/managed/SortImpl.java | 10 +-
.../drill/exec/record/JoinBatchMemoryManager.java | 7 +-
.../exec/record/RecordBatchMemoryManager.java | 12 ++-
.../apache/drill/exec/record/RecordBatchSizer.java | 120 ++++++++++++---------
.../exec/physical/unit/TestOutputBatchSize.java | 2 +-
.../org/apache/drill/test/DrillTestWrapper.java | 2 +-
.../drill/test/rowSet/AbstractSingleRowSet.java | 2 +-
.../apache/drill/test/rowSet/IndirectRowSet.java | 2 +-
12 files changed, 96 insertions(+), 71 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 320f296..3b50471 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -517,10 +517,10 @@ public abstract class HashAggTemplate implements HashAggregator {
logger.trace("Incoming sizer: {}",sizer);
// An empty batch only has the schema, can not tell actual length of varchars
// else use the actual varchars length, each capped at 50 (to match the space allocation)
- long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
+ long estInputRowWidth = sizer.rowCount() == 0 ? sizer.getStdRowWidth() : sizer.getNetRowWidthCap50();
// Get approx max (varchar) column width to get better memory allocation
- maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+ maxColumnWidth = Math.max(sizer.getMaxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
//
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index e525530..d80237c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -249,7 +249,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
tmpBatchesList.add(currentBatch);
partitionBatchesCount++;
- long batchSize = new RecordBatchSizer(currentBatch).actualSize();
+ long batchSize = new RecordBatchSizer(currentBatch).getActualSize();
inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
partitionInMemorySize += batchSize;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index bb0b1ad..6c9a398 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -438,7 +438,7 @@ public abstract class HashTableTemplate implements HashTable {
size += ledger.getAccountedSize();
}
- size += new RecordBatchSizer(htContainer).actualSize();
+ size += new RecordBatchSizer(htContainer).getActualSize();
return size;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 2f92d52..8421d6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -136,7 +136,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
// Average rowWidth excluding the flatten column.
- final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().netRowWidth() - avgRowWidthFlattenColumn;
+ final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn;
// Average rowWidth of single element in the flatten list.
// subtract the offset vector size from column data size.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 55a20bd..03fb751 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -298,14 +298,14 @@ public class SortImpl {
// during the transfer, we immediately follow the transfer with an SV2
// allocation that will fail if we are over the allocation limit.
- if (isSpillNeeded(sizer.actualSize())) {
+ if (isSpillNeeded(sizer.getActualSize())) {
spillFromMemory();
}
// Sanity check. We should now be below the buffer memory maximum.
long startMem = allocator.getAllocatedMemory();
- bufferedBatches.add(incoming, sizer.netSize());
+ bufferedBatches.add(incoming, sizer.getNetBatchSize());
// Compute batch size, including allocation of an sv2.
@@ -314,7 +314,7 @@ public class SortImpl {
// Update the minimum buffer space metric.
- metrics.updateInputMetrics(sizer.rowCount(), sizer.actualSize());
+ metrics.updateInputMetrics(sizer.rowCount(), sizer.getActualSize());
metrics.updateMemory(memManager.freeMemory(endMem));
metrics.updatePeakBatches(bufferedBatches.size());
@@ -322,8 +322,8 @@ public class SortImpl {
// the effective count as given by the selection vector
// (which may exclude some records due to filtering.)
- validateBatchSize(sizer.actualSize(), batchSize);
- if (memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount())) {
+ validateBatchSize(sizer.getActualSize(), batchSize);
+ if (memManager.updateEstimates((int) batchSize, sizer.getNetRowWidth(), sizer.rowCount())) {
// If estimates changed, discard the helper based on the old estimates.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 16b06fe..2ebe887 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -37,13 +37,14 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
updateIncomingStats(inputIndex);
- rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocSize();
+ rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
- // If outgoing row width is 0, just return. This is possible for empty batches or
+ // If outgoing row width is 0 or there is no change in outgoing row width, just return.
+ // This is possible for empty batches or
// when first set of batches come with OK_NEW_SCHEMA and no data.
- if (newOutgoingRowWidth == 0) {
+ if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) {
return getOutputRowCount();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 993f3cb..a270ced 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -157,9 +157,9 @@ public class RecordBatchMemoryManager {
public void update(RecordBatch recordBatch, int index) {
// Get sizing information for the batch.
setRecordBatchSizer(index, new RecordBatchSizer(recordBatch));
- setOutgoingRowWidth(getRecordBatchSizer(index).netRowWidth());
+ setOutgoingRowWidth(getRecordBatchSizer(index).getNetRowWidth());
// Number of rows in outgoing batch
- setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).netRowWidth());
+ setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).getNetRowWidth());
updateIncomingStats(index);
}
@@ -201,6 +201,10 @@ public class RecordBatchMemoryManager {
return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS)));
}
+ public static int computeRowCount(int batchSize, int rowWidth) {
+ return adjustOutputRowCount(RecordBatchSizer.safeDivide(batchSize, rowWidth));
+ }
+
public void setOutgoingRowWidth(int outgoingRowWidth) {
this.outgoingRowWidth = outgoingRowWidth;
}
@@ -249,13 +253,13 @@ public class RecordBatchMemoryManager {
Preconditions.checkArgument(index >= 0 && index < numInputs);
Preconditions.checkArgument(inputBatchStats[index] != null);
inputBatchStats[index].incNumBatches();
- inputBatchStats[index].incSumBatchSizes(sizer[index].netSize());
+ inputBatchStats[index].incSumBatchSizes(sizer[index].getNetBatchSize());
inputBatchStats[index].incTotalRecords(sizer[index].rowCount());
}
public void updateIncomingStats() {
inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches();
- inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].netSize());
+ inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].getNetBatchSize());
inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 4b8ae80..83287ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
import org.apache.drill.exec.memory.BaseAllocator;
-import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVector;
@@ -53,11 +52,6 @@ import static org.apache.drill.exec.vector.AllocationHelper.STD_REPETITION_FACTO
*/
public class RecordBatchSizer {
- // TODO consolidate common memory estimation helpers
- public static final double PAYLOAD_FROM_BUFFER = SortMemoryManager.PAYLOAD_FROM_BUFFER;
- public static final double FRAGMENTATION_FACTOR = 1.0 / PAYLOAD_FROM_BUFFER;
- public static final double BUFFER_FROM_PAYLOAD = SortMemoryManager.BUFFER_FROM_PAYLOAD;
-
private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH;
private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH;
@@ -644,12 +638,6 @@ public class RecordBatchSizer {
*/
private int rowCount;
/**
- * Standard row width using Drill meta-data. Note: this information is
- * 100% bogus. Do not use it.
- */
- @Deprecated
- private int stdRowWidth;
- /**
* Actual batch size summing all buffers used to store data
* for the batch.
*/
@@ -669,9 +657,12 @@ public class RecordBatchSizer {
/**
* actual row size if input is not empty. Otherwise, standard size.
*/
- private int rowAllocSize;
- private boolean hasSv2;
+ private int rowAllocWidth;
+ private int stdRowWidth;
+
+ public SelectionVector2 sv2 = null;
private int sv2Size;
+
private int avgDensity;
private Set<BufferLedger> ledgers = Sets.newIdentityHashSet();
@@ -724,47 +715,24 @@ public class RecordBatchSizer {
for (VectorWrapper<?> vw : va) {
ColumnSize colSize = measureColumn(vw.getValueVector(), "");
columnSizes.put(vw.getField().getName(), colSize);
- stdRowWidth += colSize.getStdDataSizePerEntry();
netBatchSize += colSize.getTotalNetSize();
maxSize = Math.max(maxSize, colSize.getTotalDataSize());
if (colSize.metadata.isNullable()) {
nullableCount++;
}
netRowWidth += colSize.getNetSizePerEntry();
- rowAllocSize += colSize.getAllocSizePerEntry();
- }
-
- for (BufferLedger ledger : ledgers) {
- accountedMemorySize += ledger.getAccountedSize();
- }
-
- if (rowCount > 0) {
- grossRowWidth = safeDivide(accountedMemorySize, rowCount);
- }
-
- if (sv2 != null) {
- sv2Size = sv2.getBuffer(false).capacity();
- accountedMemorySize += sv2Size;
- hasSv2 = true;
}
-
- computeEstimates();
- }
-
- private void computeEstimates() {
- grossRowWidth = safeDivide(accountedMemorySize, rowCount);
- avgDensity = safeDivide(netBatchSize * 100L, accountedMemorySize);
+ this.sv2 = sv2;
}
public void applySv2() {
- if (hasSv2) {
+ if (sv2 == null) {
return;
}
- hasSv2 = true;
sv2Size = BaseAllocator.nextPowerOfTwo(2 * rowCount);
+ avgDensity = safeDivide(netBatchSize * 100L, getActualSize());
accountedMemorySize += sv2Size;
- computeEstimates();
}
/**
@@ -856,10 +824,64 @@ public class RecordBatchSizer {
}
public int rowCount() { return rowCount; }
- public int stdRowWidth() { return stdRowWidth; }
- public int grossRowWidth() { return grossRowWidth; }
- public int netRowWidth() { return netRowWidth; }
- public int getRowAllocSize() { return rowAllocSize; }
+
+ public int getStdRowWidth() {
+ if (stdRowWidth != 0) {
+ return stdRowWidth;
+ }
+
+ for (ColumnSize columnSize : columnSizes.values()) {
+ stdRowWidth += columnSize.getStdDataSizePerEntry();
+ }
+
+ return stdRowWidth;
+ }
+
+ public int getRowAllocWidth() {
+ if (rowAllocWidth != 0) {
+ return rowAllocWidth;
+ }
+
+ for (ColumnSize columnSize : columnSizes.values()) {
+ rowAllocWidth += columnSize.getAllocSizePerEntry();
+ }
+
+ return rowAllocWidth;
+ }
+
+ public long getActualSize() {
+ if (accountedMemorySize != 0) {
+ return accountedMemorySize;
+ }
+
+ for (BufferLedger ledger : ledgers) {
+ accountedMemorySize += ledger.getAccountedSize();
+ }
+
+ if (sv2 != null) {
+ sv2Size = sv2.getBuffer(false).capacity();
+ accountedMemorySize += sv2Size;
+ }
+
+ return accountedMemorySize;
+ }
+
+ public int getGrossRowWidth() {
+ if (grossRowWidth != 0) {
+ return grossRowWidth;
+ }
+
+ grossRowWidth = safeDivide(getActualSize(), rowCount);
+
+ return grossRowWidth;
+ }
+
+ public int getAvgDensity() {
+ return safeDivide(netBatchSize * 100L, getActualSize());
+ }
+
+
+ public int getNetRowWidth() { return netRowWidth; }
public Map<String, ColumnSize> columns() { return columnSizes; }
/**
@@ -868,12 +890,10 @@ public class RecordBatchSizer {
* and null marking columns.
* @return "real" width of the row
*/
- public int netRowWidthCap50() { return netRowWidthCap50 + nullableCount; }
- public long actualSize() { return accountedMemorySize; }
- public boolean hasSv2() { return hasSv2; }
- public int avgDensity() { return avgDensity; }
- public long netSize() { return netBatchSize; }
- public int maxAvgColumnSize() { return maxSize / rowCount; }
+ public int getNetRowWidthCap50() { return netRowWidthCap50 + nullableCount; }
+ public boolean hasSv2() { return sv2 != null; }
+ public long getNetBatchSize() { return netBatchSize; }
+ public int getMaxAvgColumnSize() { return safeDivide(maxSize, rowCount); }
@Override
public String toString() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index a029832..fd0b494 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -73,7 +73,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
long totalSize = 0;
for (VectorAccessible batch : batches) {
RecordBatchSizer sizer = new RecordBatchSizer(batch);
- totalSize += sizer.netSize();
+ totalSize += sizer.getNetBatchSize();
}
return totalSize;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 051f4b3..e037d02 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -373,7 +373,7 @@ public class DrillTestWrapper {
RecordBatchSizer sizer = new RecordBatchSizer(loader);
// Not checking actualSize as accounting is not correct when we do
// split and transfer ownership across operators.
- Assert.assertTrue(sizer.netSize() <= expectedBatchSize);
+ Assert.assertTrue(sizer.getNetBatchSize() <= expectedBatchSize);
}
// TODO: Clean: DRILL-2933: That load(...) no longer throws
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
index 71ca3cf..297a1c5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
@@ -52,7 +52,7 @@ public abstract class AbstractSingleRowSet extends AbstractRowSet implements Sin
@Override
public long size() {
RecordBatchSizer sizer = new RecordBatchSizer(container());
- return sizer.actualSize();
+ return sizer.getActualSize();
}
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index a2bc5e8..f0ebdc0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -139,6 +139,6 @@ public class IndirectRowSet extends AbstractSingleRowSet {
@Override
public long size() {
RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2);
- return sizer.actualSize();
+ return sizer.getActualSize();
}
}