You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/13 03:44:53 UTC
[drill] 01/13: DRILL-6594: Data batches for Project operator are
not being split properly and exceed the maximum specified
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit c64436774387e80fd9b0ff6e9cd7d42c9aa7a961
Author: karthik <km...@maprtech.com>
AuthorDate: Mon Jun 4 17:00:31 2018 -0700
DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified
This change fixes the incorrect accounting in the case where a columns is being projected more than once
closes #1375
---
.../impl/project/OutputWidthExpression.java | 17 ++++----
.../physical/impl/project/OutputWidthVisitor.java | 2 +-
.../impl/project/OutputWidthVisitorState.java | 7 +---
.../impl/project/ProjectMemoryManager.java | 48 ++++++++++------------
.../physical/impl/project/ProjectRecordBatch.java | 22 +++++-----
5 files changed, 42 insertions(+), 54 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index b9240d6..84a3f46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -95,30 +95,31 @@ public abstract class OutputWidthExpression {
}
/**
- * VarLenReadExpr captures the name of a variable length column that is used (read) in an expression.
- * The captured name will be used to lookup the average entry size for the column in the corresponding
+ * VarLenReadExpr captures the inputColumnName and the readExpression used to read a variable length column.
+ * The captured inputColumnName will be used to lookup the average entry size for the column in the corresponding.
+ * If inputColumnName is null then the readExpression is used to get the name of the column.
* {@link org.apache.drill.exec.record.RecordBatchSizer}
*/
public static class VarLenReadExpr extends OutputWidthExpression {
ValueVectorReadExpression readExpression;
- String name;
+ String inputColumnName;
public VarLenReadExpr(ValueVectorReadExpression readExpression) {
this.readExpression = readExpression;
- this.name = null;
+ this.inputColumnName = null;
}
- public VarLenReadExpr(String name) {
+ public VarLenReadExpr(String inputColumnName) {
this.readExpression = null;
- this.name = name;
+ this.inputColumnName = inputColumnName;
}
public ValueVectorReadExpression getReadExpression() {
return readExpression;
}
- public String getName() {
- return name;
+ public String getInputColumnName() {
+ return inputColumnName;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index cb58795..70908bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -205,7 +205,7 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
@Override
public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr, OutputWidthVisitorState state)
throws RuntimeException {
- String columnName = varLenReadExpr.getName();
+ String columnName = varLenReadExpr.getInputColumnName();
if (columnName == null) {
TypedFieldId fieldId = varLenReadExpr.getReadExpression().getTypedFieldId();
columnName = TypedFieldId.getPath(fieldId, state.manager.getIncomingBatch());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
index c0e0cb1..e18c827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
@@ -20,18 +20,13 @@ package org.apache.drill.exec.physical.impl.project;
public class OutputWidthVisitorState {
ProjectMemoryManager manager;
- ProjectMemoryManager.OutputColumnType outputColumnType;
- public OutputWidthVisitorState(ProjectMemoryManager manager, ProjectMemoryManager.OutputColumnType outputColumnType) {
+ public OutputWidthVisitorState(ProjectMemoryManager manager) {
this.manager = manager;
- this.outputColumnType = outputColumnType;
}
public ProjectMemoryManager getManager() {
return manager;
}
- public ProjectMemoryManager.OutputColumnType getOutputColumnType() {
- return outputColumnType;
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index f461b09..03c849c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.project;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -88,15 +89,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
}
class ColumnWidthInfo {
- //MaterializedField materializedField;
OutputWidthExpression outputExpression;
int width;
WidthType widthType;
OutputColumnType outputColumnType;
- String name;
- ColumnWidthInfo(ValueVector vv,
- OutputWidthExpression outputWidthExpression,
+ ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
OutputColumnType outputColumnType,
WidthType widthType,
int fieldWidth) {
@@ -104,8 +102,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
this.width = fieldWidth;
this.outputColumnType = outputColumnType;
this.widthType = widthType;
- String columnName = vv.getField().getName();
- this.name = columnName;
}
public OutputWidthExpression getOutputExpression() { return outputExpression; }
@@ -116,7 +112,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
public int getWidth() { return width; }
- public String getName() { return name; }
}
void ShouldNotReachHere() {
@@ -180,43 +175,44 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
}
- void addTransferField(ValueVector vvOut, String path) {
- addField(vvOut, null, OutputColumnType.TRANSFER, path);
+ void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) {
+ addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName);
}
- void addNewField(ValueVector vv, LogicalExpression logicalExpression) {
- addField(vv, logicalExpression, OutputColumnType.NEW, null);
+ void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+ addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName());
}
- void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, String path) {
+ void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType,
+ String inputColumnName, String outputColumnName) {
if(isFixedWidth(vv)) {
addFixedWidthField(vv);
} else {
- addVariableWidthField(vv, logicalExpression, outputColumnType, path);
+ addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName);
}
}
private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression,
- OutputColumnType outputColumnType, String path) {
+ OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
variableWidthColumnCount++;
ColumnWidthInfo columnWidthInfo;
//Variable width transfers
if(outputColumnType == OutputColumnType.TRANSFER) {
- String columnName = path;
- VarLenReadExpr readExpr = new VarLenReadExpr(columnName);
- columnWidthInfo = new ColumnWidthInfo(vv, readExpr, outputColumnType,
+ VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+ columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
} else if (isComplex(vv.getField().getType())) {
addComplexField(vv);
return;
} else {
// Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions
- OutputWidthVisitorState state = new OutputWidthVisitorState(this, outputColumnType);
+ OutputWidthVisitorState state = new OutputWidthVisitorState(this);
OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
- columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression, outputColumnType,
+ columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
}
- outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo);
+ ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
+ Preconditions.checkState(existingInfo == null);
}
void addComplexField(ValueVector vv) {
@@ -258,8 +254,8 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
setRecordBatchSizer(batchSizer);
rowWidth = 0;
int totalVariableColumnWidth = 0;
- for (String expr : outputColumnSizes.keySet()) {
- ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr);
+ for (String outputColumnName : outputColumnSizes.keySet()) {
+ ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(outputColumnName);
int width = -1;
if (columnWidthInfo.isFixedWidth()) {
// fixed width columns are accumulated in totalFixedWidthColumnWidth
@@ -269,12 +265,10 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
//As the tree is walked, the RecordBatchSizer and function annotations
//are looked-up to come up with the final FixedLenExpr
OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
- OutputColumnType columnType = columnWidthInfo.getOutputColumnType();
- OutputWidthVisitorState state = new OutputWidthVisitorState(this, columnType);
+ OutputWidthVisitorState state = new OutputWidthVisitorState(this);
OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
- assert reducedExpr instanceof FixedLenExpr;
width = ((FixedLenExpr)reducedExpr).getWidth();
- assert width >= 0;
+ Preconditions.checkState(width >= 0);
}
totalVariableColumnWidth += width;
}
@@ -301,7 +295,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, width {}, total fixed width {}"
+ ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {} ms"
+ ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(),
- totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
+ rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
(batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4bc63c0..dd93325 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -113,11 +113,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
-
- // get the output batch size from config.
- int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-
- memoryManager = new ProjectMemoryManager(configuredBatchSize);
}
@Override
@@ -367,6 +362,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
long setupNewSchemaStartTime = System.currentTimeMillis();
+ // get the output batch size from config.
+ int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ memoryManager = new ProjectMemoryManager(configuredBatchSize);
memoryManager.init(incomingBatch, this);
if (allocationVectors != null) {
for (final ValueVector v : allocationVectors) {
@@ -431,7 +429,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
vvIn.getField().getType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
- memoryManager.addTransferField(vvIn, vvIn.getField().getName());
+ memoryManager.addTransferField(vvIn, vvIn.getField().getName(), vvOut.getField().getName());
transfers.add(tp);
}
} else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors
@@ -513,7 +511,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
vectorRead.getMajorType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
- memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch));
+ memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch), vvOut.getField().getName());
transfers.add(tp);
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
} else if (expr instanceof DrillFuncHolderExpr &&
@@ -540,13 +538,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
memoryManager.addComplexField(null); // this will just add an estimate to the row width
} else {
// need to do evaluation.
- final ValueVector vector = container.addOrGet(outputField, callBack);
- allocationVectors.add(vector);
+ final ValueVector ouputVector = container.addOrGet(outputField, callBack);
+ allocationVectors.add(ouputVector);
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
- final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+ final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
- memoryManager.addNewField(vector, write);
+ memoryManager.addNewField(ouputVector, write);
// We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
if (expr instanceof ValueVectorReadExpression) {
@@ -555,7 +553,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final TypedFieldId id = vectorRead.getFieldId();
final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(),
id.getFieldIds()).getValueVector();
- vvIn.makeTransferPair(vector);
+ vvIn.makeTransferPair(ouputVector);
}
}
}