You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/13 03:44:53 UTC

[drill] 01/13: DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit c64436774387e80fd9b0ff6e9cd7d42c9aa7a961
Author: karthik <km...@maprtech.com>
AuthorDate: Mon Jun 4 17:00:31 2018 -0700

    DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified
    
    This change fixes the incorrect accounting in the case where a columns is being projected more than once
    
    closes #1375
---
 .../impl/project/OutputWidthExpression.java        | 17 ++++----
 .../physical/impl/project/OutputWidthVisitor.java  |  2 +-
 .../impl/project/OutputWidthVisitorState.java      |  7 +---
 .../impl/project/ProjectMemoryManager.java         | 48 ++++++++++------------
 .../physical/impl/project/ProjectRecordBatch.java  | 22 +++++-----
 5 files changed, 42 insertions(+), 54 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index b9240d6..84a3f46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -95,30 +95,31 @@ public abstract class OutputWidthExpression {
     }
 
     /**
-     * VarLenReadExpr captures the name of a variable length column that is used (read) in an expression.
-     * The captured name will be used to lookup the average entry size for the column in the corresponding
+     * VarLenReadExpr captures the inputColumnName and the readExpression used to read a variable length column.
+     * The captured inputColumnName will be used to lookup the average entry size for the column in the corresponding.
+     * If inputColumnName is null then the readExpression is used to get the name of the column.
      * {@link org.apache.drill.exec.record.RecordBatchSizer}
      */
     public static class VarLenReadExpr extends OutputWidthExpression  {
         ValueVectorReadExpression readExpression;
-        String name;
+        String inputColumnName;
 
         public VarLenReadExpr(ValueVectorReadExpression readExpression) {
             this.readExpression = readExpression;
-            this.name = null;
+            this.inputColumnName = null;
         }
 
-        public VarLenReadExpr(String name) {
+        public VarLenReadExpr(String inputColumnName) {
             this.readExpression = null;
-            this.name = name;
+            this.inputColumnName = inputColumnName;
         }
 
         public ValueVectorReadExpression getReadExpression() {
             return readExpression;
         }
 
-        public String getName() {
-            return name;
+        public String getInputColumnName() {
+            return inputColumnName;
         }
 
         @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index cb58795..70908bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -205,7 +205,7 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
     @Override
     public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr, OutputWidthVisitorState state)
                                                         throws RuntimeException {
-        String columnName = varLenReadExpr.getName();
+        String columnName = varLenReadExpr.getInputColumnName();
         if (columnName == null) {
             TypedFieldId fieldId = varLenReadExpr.getReadExpression().getTypedFieldId();
             columnName =  TypedFieldId.getPath(fieldId, state.manager.getIncomingBatch());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
index c0e0cb1..e18c827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
@@ -20,18 +20,13 @@ package org.apache.drill.exec.physical.impl.project;
 public class OutputWidthVisitorState {
 
     ProjectMemoryManager manager;
-    ProjectMemoryManager.OutputColumnType outputColumnType;
 
-    public OutputWidthVisitorState(ProjectMemoryManager manager, ProjectMemoryManager.OutputColumnType outputColumnType) {
+    public OutputWidthVisitorState(ProjectMemoryManager manager) {
         this.manager = manager;
-        this.outputColumnType = outputColumnType;
     }
 
     public ProjectMemoryManager getManager() {
         return manager;
     }
 
-    public ProjectMemoryManager.OutputColumnType getOutputColumnType() {
-        return outputColumnType;
-    }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index f461b09..03c849c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -88,15 +89,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
     }
 
     class ColumnWidthInfo {
-        //MaterializedField materializedField;
         OutputWidthExpression outputExpression;
         int width;
         WidthType widthType;
         OutputColumnType outputColumnType;
-        String name;
 
-        ColumnWidthInfo(ValueVector vv,
-                        OutputWidthExpression outputWidthExpression,
+        ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
                         OutputColumnType outputColumnType,
                         WidthType widthType,
                         int fieldWidth) {
@@ -104,8 +102,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
             this.width = fieldWidth;
             this.outputColumnType = outputColumnType;
             this.widthType = widthType;
-            String columnName = vv.getField().getName();
-            this.name = columnName;
         }
 
         public OutputWidthExpression getOutputExpression() { return outputExpression; }
@@ -116,7 +112,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
 
         public int getWidth() { return width; }
 
-        public String getName() { return name; }
     }
 
     void ShouldNotReachHere() {
@@ -180,43 +175,44 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
     }
 
 
-    void addTransferField(ValueVector vvOut, String path) {
-        addField(vvOut, null, OutputColumnType.TRANSFER, path);
+    void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) {
+        addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName);
     }
 
-    void addNewField(ValueVector vv, LogicalExpression logicalExpression) {
-        addField(vv, logicalExpression, OutputColumnType.NEW, null);
+    void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+        addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName());
     }
 
-    void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, String path) {
+    void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType,
+                  String inputColumnName, String outputColumnName) {
         if(isFixedWidth(vv)) {
             addFixedWidthField(vv);
         } else {
-            addVariableWidthField(vv, logicalExpression, outputColumnType, path);
+            addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName);
         }
     }
 
     private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression,
-                                       OutputColumnType outputColumnType, String path) {
+                                       OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
         variableWidthColumnCount++;
         ColumnWidthInfo columnWidthInfo;
         //Variable width transfers
         if(outputColumnType == OutputColumnType.TRANSFER) {
-            String columnName = path;
-            VarLenReadExpr readExpr = new VarLenReadExpr(columnName);
-            columnWidthInfo = new ColumnWidthInfo(vv, readExpr, outputColumnType,
+            VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+            columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
         } else if (isComplex(vv.getField().getType())) {
             addComplexField(vv);
             return;
         } else {
             // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions
-            OutputWidthVisitorState state = new OutputWidthVisitorState(this, outputColumnType);
+            OutputWidthVisitorState state = new OutputWidthVisitorState(this);
             OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
-            columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression, outputColumnType,
+            columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
         }
-        outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo);
+        ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
+        Preconditions.checkState(existingInfo == null);
     }
 
     void addComplexField(ValueVector vv) {
@@ -258,8 +254,8 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
         setRecordBatchSizer(batchSizer);
         rowWidth = 0;
         int totalVariableColumnWidth = 0;
-        for (String expr : outputColumnSizes.keySet()) {
-            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr);
+        for (String outputColumnName : outputColumnSizes.keySet()) {
+            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(outputColumnName);
             int width = -1;
             if (columnWidthInfo.isFixedWidth()) {
                 // fixed width columns are accumulated in totalFixedWidthColumnWidth
@@ -269,12 +265,10 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
                 //As the tree is walked, the RecordBatchSizer and function annotations
                 //are looked-up to come up with the final FixedLenExpr
                 OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
-                OutputColumnType columnType = columnWidthInfo.getOutputColumnType();
-                OutputWidthVisitorState state = new OutputWidthVisitorState(this, columnType);
+                OutputWidthVisitorState state = new OutputWidthVisitorState(this);
                 OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
-                assert reducedExpr instanceof FixedLenExpr;
                 width = ((FixedLenExpr)reducedExpr).getWidth();
-                assert width >= 0;
+                Preconditions.checkState(width >= 0);
             }
             totalVariableColumnWidth += width;
         }
@@ -301,7 +295,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
         logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, width {}, total fixed width {}"
                     + ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {}  ms"
                     + ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(),
-                    totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
+                    rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
                     (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
 
         logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4bc63c0..dd93325 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -113,11 +113,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
   public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
-
-    // get the output batch size from config.
-    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-
-    memoryManager = new ProjectMemoryManager(configuredBatchSize);
   }
 
   @Override
@@ -367,6 +362,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
   private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
     long setupNewSchemaStartTime = System.currentTimeMillis();
+    // get the output batch size from config.
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    memoryManager = new ProjectMemoryManager(configuredBatchSize);
     memoryManager.init(incomingBatch, this);
     if (allocationVectors != null) {
       for (final ValueVector v : allocationVectors) {
@@ -431,7 +429,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
               final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
                 vvIn.getField().getType()), callBack);
               final TransferPair tp = vvIn.makeTransferPair(vvOut);
-              memoryManager.addTransferField(vvIn, vvIn.getField().getName());
+              memoryManager.addTransferField(vvIn, vvIn.getField().getName(), vvOut.getField().getName());
               transfers.add(tp);
             }
           } else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors
@@ -513,7 +511,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
             vectorRead.getMajorType()), callBack);
         final TransferPair tp = vvIn.makeTransferPair(vvOut);
-        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch));
+        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch), vvOut.getField().getName());
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
       } else if (expr instanceof DrillFuncHolderExpr &&
@@ -540,13 +538,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         memoryManager.addComplexField(null); // this will just add an estimate to the row width
       } else {
         // need to do evaluation.
-        final ValueVector vector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vector);
+        final ValueVector ouputVector = container.addOrGet(outputField, callBack);
+        allocationVectors.add(ouputVector);
         final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-        final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
         final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
         final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-        memoryManager.addNewField(vector, write);
+        memoryManager.addNewField(ouputVector, write);
 
         // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
         if (expr instanceof ValueVectorReadExpression) {
@@ -555,7 +553,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             final TypedFieldId id = vectorRead.getFieldId();
             final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(),
                     id.getFieldIds()).getValueVector();
-            vvIn.makeTransferPair(vector);
+            vvIn.makeTransferPair(ouputVector);
           }
         }
       }