You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/08/25 06:15:06 UTC

[drill] branch master updated: DRILL-6688 Data batches for Project operator exceed the maximum specified (#1442)

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new ddb35ce  DRILL-6688 Data batches for Project operator exceed the maximum specified (#1442)
ddb35ce is described below

commit ddb35ce71837376c7caef28c25327ba556bb32f2
Author: Karthikeyan Manivannan <km...@mapr.com>
AuthorDate: Fri Aug 24 23:15:02 2018 -0700

    DRILL-6688 Data batches for Project operator exceed the maximum specified (#1442)
    
    
    This change separates the metadata-width and data-width of a variable-width column such that the data-width is used in all intermediate calculations and the meta-data width is added finally when the column's width is accumulated into the total width.
---
 .../expr/fn/output/OutputWidthCalculators.java     |  4 +-
 .../impl/project/OutputWidthExpression.java        | 13 ++++-
 .../physical/impl/project/OutputWidthVisitor.java  | 12 ++--
 .../impl/project/ProjectMemoryManager.java         | 68 +++++++++++++++++-----
 4 files changed, 73 insertions(+), 24 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
index 7c0bb38..cb6ff82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/OutputWidthCalculators.java
@@ -59,7 +59,7 @@ public class OutputWidthCalculators {
                 throw new IllegalArgumentException();
             }
             for (FixedLenExpr expr : args) {
-                outputSize += expr.getWidth();
+                outputSize += expr.getDataWidth();
             }
             outputSize = adjustOutputWidth(outputSize, "ConcatOutputWidthCalculator:");
             return outputSize;
@@ -85,7 +85,7 @@ public class OutputWidthCalculators {
             if (args == null || args.size() < 1) {
                 throw new IllegalArgumentException();
             }
-            outputSize = args.get(0).getWidth();
+            outputSize = args.get(0).getDataWidth();
             outputSize = adjustOutputWidth(outputSize, "CloneOutputWidthCalculator:");
             return outputSize;
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index 84a3f46..2d5f640 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -133,11 +133,18 @@ public abstract class OutputWidthExpression {
      */
 
     public static class FixedLenExpr extends OutputWidthExpression {
-        int fixedWidth;
+        /**
+         * Only the width of the payload is saved in fixedDataWidth.
+         * Metadata width is added when the final output row size is calculated.
+         * This is to avoid function {@link OutputWidthCalculator} from using
+         * metadata width in the calculations.
+         */
+        private int fixedDataWidth;
+
         public FixedLenExpr(int fixedWidth) {
-            this.fixedWidth = fixedWidth;
+            this.fixedDataWidth = fixedWidth;
         }
-        public int getWidth() { return fixedWidth;}
+        public int getDataWidth() { return fixedDataWidth;}
 
         @Override
         public <T, V, E extends Exception> T accept(AbstractExecExprVisitor<T, V, E> visitor, V value) throws E {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index 70908bf..d53adc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -212,7 +212,7 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
         }
         final RecordBatchSizer.ColumnSize columnSize = state.manager.getColumnSize(columnName);
 
-        int columnWidth = columnSize.getNetSizePerEntry();
+        int columnWidth = columnSize.getDataSizePerEntry();
         return new FixedLenExpr(columnWidth);
     }
 
@@ -256,12 +256,12 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
                                                         throws RuntimeException {
         OutputWidthExpression ifReducedExpr = ifElseWidthExpr.expressions[0].accept(this, state);
         assert ifReducedExpr instanceof FixedLenExpr;
-        int ifWidth = ((FixedLenExpr)ifReducedExpr).getWidth();
+        int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth();
         int elseWidth = -1;
         if (ifElseWidthExpr.expressions[1] != null) {
             OutputWidthExpression elseReducedExpr = ifElseWidthExpr.expressions[1].accept(this, state);
             assert elseReducedExpr instanceof FixedLenExpr;
-            elseWidth = ((FixedLenExpr)elseReducedExpr).getWidth();
+            elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth();
         }
         int outputWidth = Math.max(ifWidth, elseWidth);
         return new FixedLenExpr(outputWidth);
@@ -270,8 +270,10 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
     private OutputWidthExpression getFixedLenExpr(MajorType majorType) {
         MajorType type = majorType;
         if (Types.isFixedWidthType(type)) {
-            int fixedWidth = ProjectMemoryManager.getWidthOfFixedWidthType(type);
-            return new OutputWidthExpression.FixedLenExpr(fixedWidth);
+            // Use only the width of the data. Metadata width will be accounted for at the end
+            // This is to avoid using metadata size in intermediate calculations
+            int fixedDataWidth = ProjectMemoryManager.getDataWidthOfFixedWidthType(type);
+            return new OutputWidthExpression.FixedLenExpr(fixedDataWidth);
         }
         return null;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 03c849c..c3a983b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -20,17 +20,21 @@ package org.apache.drill.exec.physical.impl.project;
 import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.VarLenReadExpr;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.FixedLenExpr;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -93,15 +97,18 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
         int width;
         WidthType widthType;
         OutputColumnType outputColumnType;
+        ValueVector outputVV; // for transfers, this is the transfer src
+
 
         ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
                         OutputColumnType outputColumnType,
                         WidthType widthType,
-                        int fieldWidth) {
+                        int fieldWidth, ValueVector outputVV) {
             this.outputExpression = outputWidthExpression;
             this.width = fieldWidth;
             this.outputColumnType = outputColumnType;
             this.widthType = widthType;
+            this.outputVV = outputVV;
         }
 
         public OutputWidthExpression getOutputExpression() { return outputExpression; }
@@ -151,13 +158,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
     static boolean isFixedWidth(ValueVector vv) {  return (vv instanceof FixedWidthVector); }
 
 
-    static int getWidthOfFixedWidthType(ValueVector vv) {
+    static int getNetWidthOfFixedWidthType(ValueVector vv) {
         assert isFixedWidth(vv);
         return ((FixedWidthVector)vv).getValueWidth();
     }
 
-    public static int getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
-        DataMode mode = majorType.getMode();
+    public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
         MinorType minorType = majorType.getMinorType();
         final boolean isVariableWidth  = (minorType == MinorType.VARCHAR || minorType == MinorType.VAR16CHAR
                 || minorType == MinorType.VARBINARY);
@@ -166,12 +172,11 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
             throw new IllegalArgumentException("getWidthOfFixedWidthType() cannot handle variable width types");
         }
 
-        final boolean isOptional = (mode == DataMode.OPTIONAL);
-        final boolean isRepeated = (mode == DataMode.REPEATED);
-        final boolean isRepeatedList = false; // repeated
-        final Map<String, RecordBatchSizer.ColumnSize> children = null;
+        if (minorType == MinorType.NULL) {
+            return 0;
+        }
 
-        return RecordBatchSizer.getStdNetSizePerEntryCommon(majorType, isOptional, isRepeated, isRepeatedList, children);
+        return TypeHelper.getSize(majorType);
     }
 
 
@@ -196,11 +201,13 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
                                        OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
         variableWidthColumnCount++;
         ColumnWidthInfo columnWidthInfo;
+        logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}",
+                printVV(vv), variableWidthColumnCount, outputColumnType);
         //Variable width transfers
         if(outputColumnType == OutputColumnType.TRANSFER) {
             VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
             columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
-                    WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
+                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the RecordBatchSizer
         } else if (isComplex(vv.getField().getType())) {
             addComplexField(vv);
             return;
@@ -209,25 +216,37 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
             OutputWidthVisitorState state = new OutputWidthVisitorState(this);
             OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
             columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
-                    WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
+                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the OutputWidthExpression
         }
         ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
         Preconditions.checkState(existingInfo == null);
     }
 
+    public static String printVV(ValueVector vv) {
+        String str = "null";
+        if (vv != null) {
+            str = vv.getField().getName() + " " + vv.getField().getType();
+        }
+        return str;
+    }
+
     void addComplexField(ValueVector vv) {
         //Complex types are not yet supported. Just use a guess for the size
         assert vv == null || isComplex(vv.getField().getType());
         complexColumnsCount++;
         // just a guess
         totalComplexColumnWidth +=  OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
+        logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
+                printVV(vv), complexColumnsCount, totalComplexColumnWidth);
     }
 
     void addFixedWidthField(ValueVector vv) {
         assert isFixedWidth(vv);
         fixedWidthColumnCount++;
-        int fixedFieldWidth = getWidthOfFixedWidthType(vv);
+        int fixedFieldWidth = getNetWidthOfFixedWidthType(vv);
         totalFixedWidthColumnWidth += fixedFieldWidth;
+        logger.trace("addFixedWidthField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
+                printVV(vv), fixedWidthColumnCount, totalFixedWidthColumnWidth);
     }
 
     public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) {
@@ -267,8 +286,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
                 OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
                 OutputWidthVisitorState state = new OutputWidthVisitorState(this);
                 OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
-                width = ((FixedLenExpr)reducedExpr).getWidth();
+                width = ((FixedLenExpr)reducedExpr).getDataWidth();
                 Preconditions.checkState(width >= 0);
+                int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV);
+                logger.trace("update(): fieldName {} width: {} metadataWidth: {}",
+                        columnWidthInfo.outputVV.getField().getName(), width, metadataWidth);
+                width += metadataWidth;
             }
             totalVariableColumnWidth += width;
         }
@@ -301,4 +324,21 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
         logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
         updateIncomingStats();
     }
+
+    public static int getMetadataWidth(ValueVector vv) {
+        int width = 0;
+        if (vv instanceof NullableVector) {
+            width += ((NullableVector)vv).getBitsVector().getPayloadByteCount(1);
+        }
+
+        if (vv instanceof VariableWidthVector) {
+            width += ((VariableWidthVector)vv).getOffsetVector().getPayloadByteCount(1);
+        }
+
+        if (vv instanceof BaseRepeatedValueVector) {
+            width += ((BaseRepeatedValueVector)vv).getOffsetVector().getPayloadByteCount(1);
+            width += (getMetadataWidth(((BaseRepeatedValueVector)vv).getDataVector()) * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+        }
+        return width;
+    }
 }