You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/25 06:15:03 UTC

[GitHub] Ben-Zvi closed pull request #1442: DRILL-6688 Data batches for Project operator exceed the maximum specified

Ben-Zvi closed pull request #1442: DRILL-6688 Data batches for Project operator exceed the maximum specified
URL: https://github.com/apache/drill/pull/1442
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
index 7c0bb380aa2..cb6ff820481 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
@@ -59,7 +59,7 @@ public int getOutputWidth(List<FixedLenExpr> args) {
                 throw new IllegalArgumentException();
             }
             for (FixedLenExpr expr : args) {
-                outputSize += expr.getWidth();
+                outputSize += expr.getDataWidth();
             }
             outputSize = adjustOutputWidth(outputSize, "ConcatOutputWidthCalculator:");
             return outputSize;
@@ -85,7 +85,7 @@ public int getOutputWidth(List<FixedLenExpr> args) {
             if (args == null || args.size() < 1) {
                 throw new IllegalArgumentException();
             }
-            outputSize = args.get(0).getWidth();
+            outputSize = args.get(0).getDataWidth();
             outputSize = adjustOutputWidth(outputSize, "CloneOutputWidthCalculator:");
             return outputSize;
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index 84a3f46f886..2d5f640711e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -133,11 +133,18 @@ public String getInputColumnName() {
      */
 
     public static class FixedLenExpr extends OutputWidthExpression {
-        int fixedWidth;
+        /**
+         * Only the width of the payload is saved in fixedDataWidth.
+         * Metadata width is added when the final output row size is calculated.
+         * This is to avoid function {@link OutputWidthCalculator} from using
+         * metadata width in the calculations.
+         */
+        private int fixedDataWidth;
+
         public FixedLenExpr(int fixedWidth) {
-            this.fixedWidth = fixedWidth;
+            this.fixedDataWidth = fixedWidth;
         }
-        public int getWidth() { return fixedWidth;}
+        public int getDataWidth() { return fixedDataWidth;}
 
         @Override
         public <T, V, E extends Exception> T accept(AbstractExecExprVisitor<T, V, E> visitor, V value) throws E {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index 70908bf2fcb..d53adc8a1db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -212,7 +212,7 @@ public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr,
         }
         final RecordBatchSizer.ColumnSize columnSize = state.manager.getColumnSize(columnName);
 
-        int columnWidth = columnSize.getNetSizePerEntry();
+        int columnWidth = columnSize.getDataSizePerEntry();
         return new FixedLenExpr(columnWidth);
     }
 
@@ -256,12 +256,12 @@ public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr ifElseWidthExp
                                                         throws RuntimeException {
         OutputWidthExpression ifReducedExpr = ifElseWidthExpr.expressions[0].accept(this, state);
         assert ifReducedExpr instanceof FixedLenExpr;
-        int ifWidth = ((FixedLenExpr)ifReducedExpr).getWidth();
+        int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth();
         int elseWidth = -1;
         if (ifElseWidthExpr.expressions[1] != null) {
             OutputWidthExpression elseReducedExpr = ifElseWidthExpr.expressions[1].accept(this, state);
             assert elseReducedExpr instanceof FixedLenExpr;
-            elseWidth = ((FixedLenExpr)elseReducedExpr).getWidth();
+            elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth();
         }
         int outputWidth = Math.max(ifWidth, elseWidth);
         return new FixedLenExpr(outputWidth);
@@ -270,8 +270,10 @@ public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr ifElseWidthExp
     private OutputWidthExpression getFixedLenExpr(MajorType majorType) {
         MajorType type = majorType;
         if (Types.isFixedWidthType(type)) {
-            int fixedWidth = ProjectMemoryManager.getWidthOfFixedWidthType(type);
-            return new OutputWidthExpression.FixedLenExpr(fixedWidth);
+            // Use only the width of the data. Metadata width will be accounted for at the end
+            // This is to avoid using metadata size in intermediate calculations
+            int fixedDataWidth = ProjectMemoryManager.getDataWidthOfFixedWidthType(type);
+            return new OutputWidthExpression.FixedLenExpr(fixedDataWidth);
         }
         return null;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 03c849cfef6..c3a983b479a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -20,17 +20,21 @@
 import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.VarLenReadExpr;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.FixedLenExpr;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -93,15 +97,18 @@ public RecordBatch getIncomingBatch() {
         int width;
         WidthType widthType;
         OutputColumnType outputColumnType;
+        ValueVector outputVV; // for transfers, this is the transfer src
+
 
         ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
                         OutputColumnType outputColumnType,
                         WidthType widthType,
-                        int fieldWidth) {
+                        int fieldWidth, ValueVector outputVV) {
             this.outputExpression = outputWidthExpression;
             this.width = fieldWidth;
             this.outputColumnType = outputColumnType;
             this.widthType = widthType;
+            this.outputVV = outputVV;
         }
 
         public OutputWidthExpression getOutputExpression() { return outputExpression; }
@@ -151,13 +158,12 @@ public ValueVector getOutgoingValueVector(TypedFieldId fieldId) {
     static boolean isFixedWidth(ValueVector vv) {  return (vv instanceof FixedWidthVector); }
 
 
-    static int getWidthOfFixedWidthType(ValueVector vv) {
+    static int getNetWidthOfFixedWidthType(ValueVector vv) {
         assert isFixedWidth(vv);
         return ((FixedWidthVector)vv).getValueWidth();
     }
 
-    public static int getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
-        DataMode mode = majorType.getMode();
+    public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
         MinorType minorType = majorType.getMinorType();
         final boolean isVariableWidth  = (minorType == MinorType.VARCHAR || minorType == MinorType.VAR16CHAR
                 || minorType == MinorType.VARBINARY);
@@ -166,12 +172,11 @@ public static int getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
             throw new IllegalArgumentException("getWidthOfFixedWidthType() cannot handle variable width types");
         }
 
-        final boolean isOptional = (mode == DataMode.OPTIONAL);
-        final boolean isRepeated = (mode == DataMode.REPEATED);
-        final boolean isRepeatedList = false; // repeated
-        final Map<String, RecordBatchSizer.ColumnSize> children = null;
+        if (minorType == MinorType.NULL) {
+            return 0;
+        }
 
-        return RecordBatchSizer.getStdNetSizePerEntryCommon(majorType, isOptional, isRepeated, isRepeatedList, children);
+        return TypeHelper.getSize(majorType);
     }
 
 
@@ -196,11 +201,13 @@ private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpr
                                        OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
         variableWidthColumnCount++;
         ColumnWidthInfo columnWidthInfo;
+        logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}",
+                printVV(vv), variableWidthColumnCount, outputColumnType);
         //Variable width transfers
         if(outputColumnType == OutputColumnType.TRANSFER) {
             VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
             columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
-                    WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
+                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the RecordBatchSizer
         } else if (isComplex(vv.getField().getType())) {
             addComplexField(vv);
             return;
@@ -209,25 +216,37 @@ private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpr
             OutputWidthVisitorState state = new OutputWidthVisitorState(this);
             OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
             columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
-                    WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
+                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the OutputWidthExpression
         }
         ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
         Preconditions.checkState(existingInfo == null);
     }
 
+    public static String printVV(ValueVector vv) {
+        String str = "null";
+        if (vv != null) {
+            str = vv.getField().getName() + " " + vv.getField().getType();
+        }
+        return str;
+    }
+
     void addComplexField(ValueVector vv) {
         //Complex types are not yet supported. Just use a guess for the size
         assert vv == null || isComplex(vv.getField().getType());
         complexColumnsCount++;
         // just a guess
         totalComplexColumnWidth +=  OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
+        logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
+                printVV(vv), complexColumnsCount, totalComplexColumnWidth);
     }
 
     void addFixedWidthField(ValueVector vv) {
         assert isFixedWidth(vv);
         fixedWidthColumnCount++;
-        int fixedFieldWidth = getWidthOfFixedWidthType(vv);
+        int fixedFieldWidth = getNetWidthOfFixedWidthType(vv);
         totalFixedWidthColumnWidth += fixedFieldWidth;
+        logger.trace("addFixedWidthField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
+                printVV(vv), fixedWidthColumnCount, totalFixedWidthColumnWidth);
     }
 
     public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) {
@@ -267,8 +286,12 @@ public void update() {
                 OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
                 OutputWidthVisitorState state = new OutputWidthVisitorState(this);
                 OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
-                width = ((FixedLenExpr)reducedExpr).getWidth();
+                width = ((FixedLenExpr)reducedExpr).getDataWidth();
                 Preconditions.checkState(width >= 0);
+                int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV);
+                logger.trace("update(): fieldName {} width: {} metadataWidth: {}",
+                        columnWidthInfo.outputVV.getField().getName(), width, metadataWidth);
+                width += metadataWidth;
             }
             totalVariableColumnWidth += width;
         }
@@ -301,4 +324,21 @@ public void update() {
         logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
         updateIncomingStats();
     }
+
+    public static int getMetadataWidth(ValueVector vv) {
+        int width = 0;
+        if (vv instanceof NullableVector) {
+            width += ((NullableVector)vv).getBitsVector().getPayloadByteCount(1);
+        }
+
+        if (vv instanceof VariableWidthVector) {
+            width += ((VariableWidthVector)vv).getOffsetVector().getPayloadByteCount(1);
+        }
+
+        if (vv instanceof BaseRepeatedValueVector) {
+            width += ((BaseRepeatedValueVector)vv).getOffsetVector().getPayloadByteCount(1);
+            width += (getMetadataWidth(((BaseRepeatedValueVector)vv).getDataVector()) * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+        }
+        return width;
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services