You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/03/08 02:55:16 UTC
[7/8] drill git commit: DRILL-6126: Allocate memory for value vectors
upfront in flatten operator
DRILL-6126: Allocate memory for value vectors upfront in flatten operator
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/31e0f29a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/31e0f29a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/31e0f29a
Branch: refs/heads/master
Commit: 31e0f29a6140a19eda8de615e615208f51f2cf96
Parents: 47c5d1f
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Tue Mar 6 16:09:13 2018 -0800
Committer: Ben-Zvi <bb...@mapr.com>
Committed: Wed Mar 7 15:41:26 2018 -0800
----------------------------------------------------------------------
.../impl/flatten/FlattenRecordBatch.java | 34 +++++++++++---------
.../AbstractRecordBatchMemoryManager.java | 24 ++++++++++++--
2 files changed, 40 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/31e0f29a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 4a910ef..9dd1770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -104,25 +104,24 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
@Override
public void update() {
// Get sizing information for the batch.
- RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+ setRecordBatchSizer(new RecordBatchSizer(incoming));
final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
// Get column size of flatten column.
- RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
- typedFieldId.getFieldIds()).getValueVector(), field.getName());
+ RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(field.getName());
// Average rowWidth of flatten column
- final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount());
+ final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
// Average rowWidth excluding the flatten column.
- final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn;
+ final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().netRowWidth() - avgRowWidthFlattenColumn;
// Average rowWidth of single element in the flatten list.
// subtract the offset vector size from column data size.
final int avgRowWidthSingleFlattenEntry =
- RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);
+ RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - (OFFSET_VECTOR_WIDTH * columnSize.getValueCount()), columnSize.getElementCount());
// Average rowWidth of outgoing batch.
final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
@@ -130,13 +129,16 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
// Number of rows in outgoing batch
setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
+ // Limit to lower bound of total number of rows possible for this batch
+ // i.e. all rows fit within memory budget.
+ setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
+
logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
- "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
+ "avgOutgoingRowWidth : {}, outputRowCount : {}", getRecordBatchSizer(), outputBatchSize,
+ avgOutgoingRowWidth, getOutputRowCount());
}
-
}
-
public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
@@ -199,7 +201,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
int incomingRecordCount = incoming.getRecordCount();
- if (!doAlloc()) {
+ if (!doAlloc(flattenMemoryManager.getOutputRowCount())) {
outOfMemory = true;
return IterOutcome.OUT_OF_MEMORY;
}
@@ -235,7 +237,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
private void handleRemainder() {
int remainingRecordCount = flattener.getFlattenField().getAccessor().getInnerValueCount() - remainderIndex;
- if (!doAlloc()) {
+ if (!doAlloc(remainingRecordCount)) {
outOfMemory = true;
return;
}
@@ -266,12 +268,12 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
complexWriters.add(writer);
}
- private boolean doAlloc() {
- //Allocate vv in the allocationVectors.
+ private boolean doAlloc(int recordCount) {
+
for (ValueVector v : this.allocationVectors) {
- if (!v.allocateNewSafe()) {
- return false;
- }
+ // This will iteratively allocate memory for nested columns underneath.
+ RecordBatchSizer.ColumnSize colSize = flattenMemoryManager.getColumnSize(v.getField().getName());
+ colSize.allocateVector(v, recordCount);
}
//Allocate vv for complexWriters.
http://git-wip-us.apache.org/repos/asf/drill/blob/31e0f29a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
index b91ede0..1abd365 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
@@ -27,6 +27,7 @@ public abstract class AbstractRecordBatchMemoryManager {
protected static final int MIN_NUM_ROWS = 1;
private int outputRowCount = MAX_NUM_ROWS;
private int outgoingRowWidth;
+ private RecordBatchSizer sizer;
public void update(int inputIndex) {};
@@ -41,14 +42,20 @@ public abstract class AbstractRecordBatchMemoryManager {
* the min and max that is allowed.
*/
public void setOutputRowCount(int targetBatchSize, int rowWidth) {
- this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize/WORST_CASE_FRAGMENTATION_FACTOR, rowWidth));
+ this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth));
+ }
+
+ public void setOutputRowCount(int outputRowCount) {
+ this.outputRowCount = outputRowCount;
}
/**
* This will adjust rowCount taking into account the min and max that is allowed.
+ * We will round down to nearest power of two - 1 for better memory utilization.
+ * -1 is done for adjusting accounting for offset vectors.
*/
public static int adjustOutputRowCount(int rowCount) {
- return (Math.min(MAX_NUM_ROWS, Math.max(rowCount, MIN_NUM_ROWS)));
+ return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS)));
}
public void setOutgoingRowWidth(int outgoingRowWidth) {
@@ -58,4 +65,17 @@ public abstract class AbstractRecordBatchMemoryManager {
public int getOutgoingRowWidth() {
return outgoingRowWidth;
}
+
+ public void setRecordBatchSizer(RecordBatchSizer sizer) {
+ this.sizer = sizer;
+ }
+
+ public RecordBatchSizer getRecordBatchSizer() {
+ return sizer;
+ }
+
+ public RecordBatchSizer.ColumnSize getColumnSize(String name) {
+ return sizer.getColumn(name);
+ }
+
}