You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/08/25 06:15:06 UTC
[drill] branch master updated: DRILL-6688 Data batches for Project
operator exceed the maximum specified (#1442)
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new ddb35ce DRILL-6688 Data batches for Project operator exceed the maximum specified (#1442)
ddb35ce is described below
commit ddb35ce71837376c7caef28c25327ba556bb32f2
Author: Karthikeyan Manivannan <km...@mapr.com>
AuthorDate: Fri Aug 24 23:15:02 2018 -0700
DRILL-6688 Data batches for Project operator exceed the maximum specified (#1442)
This change separates the metadata-width and data-width of a variable-width column such that the data-width is used in all intermediate calculations and the meta-data width is added finally when the column's width is accumulated into the total width.
---
.../expr/fn/output/OutputWidthCalculators.java | 4 +-
.../impl/project/OutputWidthExpression.java | 13 ++++-
.../physical/impl/project/OutputWidthVisitor.java | 12 ++--
.../impl/project/ProjectMemoryManager.java | 68 +++++++++++++++++-----
4 files changed, 73 insertions(+), 24 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
index 7c0bb38..cb6ff82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
@@ -59,7 +59,7 @@ public class OutputWidthCalculators {
throw new IllegalArgumentException();
}
for (FixedLenExpr expr : args) {
- outputSize += expr.getWidth();
+ outputSize += expr.getDataWidth();
}
outputSize = adjustOutputWidth(outputSize, "ConcatOutputWidthCalculator:");
return outputSize;
@@ -85,7 +85,7 @@ public class OutputWidthCalculators {
if (args == null || args.size() < 1) {
throw new IllegalArgumentException();
}
- outputSize = args.get(0).getWidth();
+ outputSize = args.get(0).getDataWidth();
outputSize = adjustOutputWidth(outputSize, "CloneOutputWidthCalculator:");
return outputSize;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index 84a3f46..2d5f640 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -133,11 +133,18 @@ public abstract class OutputWidthExpression {
*/
public static class FixedLenExpr extends OutputWidthExpression {
- int fixedWidth;
+ /**
+ * Only the width of the payload is saved in fixedDataWidth.
+ * Metadata width is added when the final output row size is calculated.
+ * This is to avoid function {@link OutputWidthCalculator} from using
+ * metadata width in the calculations.
+ */
+ private int fixedDataWidth;
+
public FixedLenExpr(int fixedWidth) {
- this.fixedWidth = fixedWidth;
+ this.fixedDataWidth = fixedWidth;
}
- public int getWidth() { return fixedWidth;}
+ public int getDataWidth() { return fixedDataWidth;}
@Override
public <T, V, E extends Exception> T accept(AbstractExecExprVisitor<T, V, E> visitor, V value) throws E {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index 70908bf..d53adc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -212,7 +212,7 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
}
final RecordBatchSizer.ColumnSize columnSize = state.manager.getColumnSize(columnName);
- int columnWidth = columnSize.getNetSizePerEntry();
+ int columnWidth = columnSize.getDataSizePerEntry();
return new FixedLenExpr(columnWidth);
}
@@ -256,12 +256,12 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
throws RuntimeException {
OutputWidthExpression ifReducedExpr = ifElseWidthExpr.expressions[0].accept(this, state);
assert ifReducedExpr instanceof FixedLenExpr;
- int ifWidth = ((FixedLenExpr)ifReducedExpr).getWidth();
+ int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth();
int elseWidth = -1;
if (ifElseWidthExpr.expressions[1] != null) {
OutputWidthExpression elseReducedExpr = ifElseWidthExpr.expressions[1].accept(this, state);
assert elseReducedExpr instanceof FixedLenExpr;
- elseWidth = ((FixedLenExpr)elseReducedExpr).getWidth();
+ elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth();
}
int outputWidth = Math.max(ifWidth, elseWidth);
return new FixedLenExpr(outputWidth);
@@ -270,8 +270,10 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
private OutputWidthExpression getFixedLenExpr(MajorType majorType) {
MajorType type = majorType;
if (Types.isFixedWidthType(type)) {
- int fixedWidth = ProjectMemoryManager.getWidthOfFixedWidthType(type);
- return new OutputWidthExpression.FixedLenExpr(fixedWidth);
+ // Use only the width of the data. Metadata width will be accounted for at the end
+ // This is to avoid using metadata size in intermediate calculations
+ int fixedDataWidth = ProjectMemoryManager.getDataWidthOfFixedWidthType(type);
+ return new OutputWidthExpression.FixedLenExpr(fixedDataWidth);
}
return null;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 03c849c..c3a983b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -20,17 +20,21 @@ package org.apache.drill.exec.physical.impl.project;
import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.VarLenReadExpr;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.FixedLenExpr;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import java.util.HashMap;
import java.util.Map;
@@ -93,15 +97,18 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
int width;
WidthType widthType;
OutputColumnType outputColumnType;
+ ValueVector outputVV; // for transfers, this is the transfer src
+
ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
OutputColumnType outputColumnType,
WidthType widthType,
- int fieldWidth) {
+ int fieldWidth, ValueVector outputVV) {
this.outputExpression = outputWidthExpression;
this.width = fieldWidth;
this.outputColumnType = outputColumnType;
this.widthType = widthType;
+ this.outputVV = outputVV;
}
public OutputWidthExpression getOutputExpression() { return outputExpression; }
@@ -151,13 +158,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
static boolean isFixedWidth(ValueVector vv) { return (vv instanceof FixedWidthVector); }
- static int getWidthOfFixedWidthType(ValueVector vv) {
+ static int getNetWidthOfFixedWidthType(ValueVector vv) {
assert isFixedWidth(vv);
return ((FixedWidthVector)vv).getValueWidth();
}
- public static int getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
- DataMode mode = majorType.getMode();
+ public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
MinorType minorType = majorType.getMinorType();
final boolean isVariableWidth = (minorType == MinorType.VARCHAR || minorType == MinorType.VAR16CHAR
|| minorType == MinorType.VARBINARY);
@@ -166,12 +172,11 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
throw new IllegalArgumentException("getWidthOfFixedWidthType() cannot handle variable width types");
}
- final boolean isOptional = (mode == DataMode.OPTIONAL);
- final boolean isRepeated = (mode == DataMode.REPEATED);
- final boolean isRepeatedList = false; // repeated
- final Map<String, RecordBatchSizer.ColumnSize> children = null;
+ if (minorType == MinorType.NULL) {
+ return 0;
+ }
- return RecordBatchSizer.getStdNetSizePerEntryCommon(majorType, isOptional, isRepeated, isRepeatedList, children);
+ return TypeHelper.getSize(majorType);
}
@@ -196,11 +201,13 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
variableWidthColumnCount++;
ColumnWidthInfo columnWidthInfo;
+ logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}",
+ printVV(vv), variableWidthColumnCount, outputColumnType);
//Variable width transfers
if(outputColumnType == OutputColumnType.TRANSFER) {
VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
- WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
+ WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the RecordBatchSizer
} else if (isComplex(vv.getField().getType())) {
addComplexField(vv);
return;
@@ -209,25 +216,37 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
OutputWidthVisitorState state = new OutputWidthVisitorState(this);
OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
- WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
+ WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the OutputWidthExpression
}
ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
Preconditions.checkState(existingInfo == null);
}
+ public static String printVV(ValueVector vv) {
+ String str = "null";
+ if (vv != null) {
+ str = vv.getField().getName() + " " + vv.getField().getType();
+ }
+ return str;
+ }
+
void addComplexField(ValueVector vv) {
//Complex types are not yet supported. Just use a guess for the size
assert vv == null || isComplex(vv.getField().getType());
complexColumnsCount++;
// just a guess
totalComplexColumnWidth += OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
+ logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
+ printVV(vv), complexColumnsCount, totalComplexColumnWidth);
}
void addFixedWidthField(ValueVector vv) {
assert isFixedWidth(vv);
fixedWidthColumnCount++;
- int fixedFieldWidth = getWidthOfFixedWidthType(vv);
+ int fixedFieldWidth = getNetWidthOfFixedWidthType(vv);
totalFixedWidthColumnWidth += fixedFieldWidth;
+ logger.trace("addFixedWidthField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
+ printVV(vv), fixedWidthColumnCount, totalFixedWidthColumnWidth);
}
public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) {
@@ -267,8 +286,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
OutputWidthVisitorState state = new OutputWidthVisitorState(this);
OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
- width = ((FixedLenExpr)reducedExpr).getWidth();
+ width = ((FixedLenExpr)reducedExpr).getDataWidth();
Preconditions.checkState(width >= 0);
+ int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV);
+ logger.trace("update(): fieldName {} width: {} metadataWidth: {}",
+ columnWidthInfo.outputVV.getField().getName(), width, metadataWidth);
+ width += metadataWidth;
}
totalVariableColumnWidth += width;
}
@@ -301,4 +324,21 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
updateIncomingStats();
}
+
+ public static int getMetadataWidth(ValueVector vv) {
+ int width = 0;
+ if (vv instanceof NullableVector) {
+ width += ((NullableVector)vv).getBitsVector().getPayloadByteCount(1);
+ }
+
+ if (vv instanceof VariableWidthVector) {
+ width += ((VariableWidthVector)vv).getOffsetVector().getPayloadByteCount(1);
+ }
+
+ if (vv instanceof BaseRepeatedValueVector) {
+ width += ((BaseRepeatedValueVector)vv).getOffsetVector().getPayloadByteCount(1);
+ width += (getMetadataWidth(((BaseRepeatedValueVector)vv).getDataVector()) * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+ }
+ return width;
+ }
}