You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/13 03:45:12 UTC

[GitHub] sohami closed pull request #1375: DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified

sohami closed pull request #1375: DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified
URL: https://github.com/apache/drill/pull/1375
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index b9240d68be0..84a3f46f886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -95,30 +95,31 @@ public OutputWidthCalculator getCalculator() {
     }
 
     /**
-     * VarLenReadExpr captures the name of a variable length column that is used (read) in an expression.
-     * The captured name will be used to lookup the average entry size for the column in the corresponding
+     * VarLenReadExpr captures the inputColumnName and the readExpression used to read a variable length column.
+     * The captured inputColumnName will be used to lookup the average entry size for the column in the corresponding.
+     * If inputColumnName is null then the readExpression is used to get the name of the column.
      * {@link org.apache.drill.exec.record.RecordBatchSizer}
      */
     public static class VarLenReadExpr extends OutputWidthExpression  {
         ValueVectorReadExpression readExpression;
-        String name;
+        String inputColumnName;
 
         public VarLenReadExpr(ValueVectorReadExpression readExpression) {
             this.readExpression = readExpression;
-            this.name = null;
+            this.inputColumnName = null;
         }
 
-        public VarLenReadExpr(String name) {
+        public VarLenReadExpr(String inputColumnName) {
             this.readExpression = null;
-            this.name = name;
+            this.inputColumnName = inputColumnName;
         }
 
         public ValueVectorReadExpression getReadExpression() {
             return readExpression;
         }
 
-        public String getName() {
-            return name;
+        public String getInputColumnName() {
+            return inputColumnName;
         }
 
         @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index cb587952304..70908bf2fcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -205,7 +205,7 @@ public OutputWidthExpression visitFixedLenExpr(FixedLenExpr fixedLenExpr, Output
     @Override
     public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr, OutputWidthVisitorState state)
                                                         throws RuntimeException {
-        String columnName = varLenReadExpr.getName();
+        String columnName = varLenReadExpr.getInputColumnName();
         if (columnName == null) {
             TypedFieldId fieldId = varLenReadExpr.getReadExpression().getTypedFieldId();
             columnName =  TypedFieldId.getPath(fieldId, state.manager.getIncomingBatch());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
index c0e0cb1c9e6..e18c827128b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
@@ -20,18 +20,13 @@
 public class OutputWidthVisitorState {
 
     ProjectMemoryManager manager;
-    ProjectMemoryManager.OutputColumnType outputColumnType;
 
-    public OutputWidthVisitorState(ProjectMemoryManager manager, ProjectMemoryManager.OutputColumnType outputColumnType) {
+    public OutputWidthVisitorState(ProjectMemoryManager manager) {
         this.manager = manager;
-        this.outputColumnType = outputColumnType;
     }
 
     public ProjectMemoryManager getManager() {
         return manager;
     }
 
-    public ProjectMemoryManager.OutputColumnType getOutputColumnType() {
-        return outputColumnType;
-    }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index f461b092281..03c849cfef6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -88,15 +89,12 @@ public RecordBatch getIncomingBatch() {
     }
 
     class ColumnWidthInfo {
-        //MaterializedField materializedField;
         OutputWidthExpression outputExpression;
         int width;
         WidthType widthType;
         OutputColumnType outputColumnType;
-        String name;
 
-        ColumnWidthInfo(ValueVector vv,
-                        OutputWidthExpression outputWidthExpression,
+        ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
                         OutputColumnType outputColumnType,
                         WidthType widthType,
                         int fieldWidth) {
@@ -104,8 +102,6 @@ public RecordBatch getIncomingBatch() {
             this.width = fieldWidth;
             this.outputColumnType = outputColumnType;
             this.widthType = widthType;
-            String columnName = vv.getField().getName();
-            this.name = columnName;
         }
 
         public OutputWidthExpression getOutputExpression() { return outputExpression; }
@@ -116,7 +112,6 @@ public RecordBatch getIncomingBatch() {
 
         public int getWidth() { return width; }
 
-        public String getName() { return name; }
     }
 
     void ShouldNotReachHere() {
@@ -180,43 +175,44 @@ public static int getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
     }
 
 
-    void addTransferField(ValueVector vvOut, String path) {
-        addField(vvOut, null, OutputColumnType.TRANSFER, path);
+    void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) {
+        addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName);
     }
 
-    void addNewField(ValueVector vv, LogicalExpression logicalExpression) {
-        addField(vv, logicalExpression, OutputColumnType.NEW, null);
+    void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+        addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName());
     }
 
-    void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, String path) {
+    void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType,
+                  String inputColumnName, String outputColumnName) {
         if(isFixedWidth(vv)) {
             addFixedWidthField(vv);
         } else {
-            addVariableWidthField(vv, logicalExpression, outputColumnType, path);
+            addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName);
         }
     }
 
     private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression,
-                                       OutputColumnType outputColumnType, String path) {
+                                       OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
         variableWidthColumnCount++;
         ColumnWidthInfo columnWidthInfo;
         //Variable width transfers
         if(outputColumnType == OutputColumnType.TRANSFER) {
-            String columnName = path;
-            VarLenReadExpr readExpr = new VarLenReadExpr(columnName);
-            columnWidthInfo = new ColumnWidthInfo(vv, readExpr, outputColumnType,
+            VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+            columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
         } else if (isComplex(vv.getField().getType())) {
             addComplexField(vv);
             return;
         } else {
             // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions
-            OutputWidthVisitorState state = new OutputWidthVisitorState(this, outputColumnType);
+            OutputWidthVisitorState state = new OutputWidthVisitorState(this);
             OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
-            columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression, outputColumnType,
+            columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
         }
-        outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo);
+        ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
+        Preconditions.checkState(existingInfo == null);
     }
 
     void addComplexField(ValueVector vv) {
@@ -258,8 +254,8 @@ public void update() {
         setRecordBatchSizer(batchSizer);
         rowWidth = 0;
         int totalVariableColumnWidth = 0;
-        for (String expr : outputColumnSizes.keySet()) {
-            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr);
+        for (String outputColumnName : outputColumnSizes.keySet()) {
+            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(outputColumnName);
             int width = -1;
             if (columnWidthInfo.isFixedWidth()) {
                 // fixed width columns are accumulated in totalFixedWidthColumnWidth
@@ -269,12 +265,10 @@ public void update() {
                 //As the tree is walked, the RecordBatchSizer and function annotations
                 //are looked-up to come up with the final FixedLenExpr
                 OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
-                OutputColumnType columnType = columnWidthInfo.getOutputColumnType();
-                OutputWidthVisitorState state = new OutputWidthVisitorState(this, columnType);
+                OutputWidthVisitorState state = new OutputWidthVisitorState(this);
                 OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
-                assert reducedExpr instanceof FixedLenExpr;
                 width = ((FixedLenExpr)reducedExpr).getWidth();
-                assert width >= 0;
+                Preconditions.checkState(width >= 0);
             }
             totalVariableColumnWidth += width;
         }
@@ -301,7 +295,7 @@ public void update() {
         logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, width {}, total fixed width {}"
                     + ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {}  ms"
                     + ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(),
-                    totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
+                    rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
                     (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
 
         logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4bc63c0b1b7..dd933250a2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -113,11 +113,6 @@ private void clear() {
 
   public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
-
-    // get the output batch size from config.
-    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-
-    memoryManager = new ProjectMemoryManager(configuredBatchSize);
   }
 
   @Override
@@ -367,6 +362,9 @@ private boolean isWildcard(final NamedExpression ex) {
 
   private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
     long setupNewSchemaStartTime = System.currentTimeMillis();
+    // get the output batch size from config.
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    memoryManager = new ProjectMemoryManager(configuredBatchSize);
     memoryManager.init(incomingBatch, this);
     if (allocationVectors != null) {
       for (final ValueVector v : allocationVectors) {
@@ -431,7 +429,7 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
               final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
                 vvIn.getField().getType()), callBack);
               final TransferPair tp = vvIn.makeTransferPair(vvOut);
-              memoryManager.addTransferField(vvIn, vvIn.getField().getName());
+              memoryManager.addTransferField(vvIn, vvIn.getField().getName(), vvOut.getField().getName());
               transfers.add(tp);
             }
           } else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors
@@ -513,7 +511,7 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
           container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
             vectorRead.getMajorType()), callBack);
         final TransferPair tp = vvIn.makeTransferPair(vvOut);
-        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch));
+        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch), vvOut.getField().getName());
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
       } else if (expr instanceof DrillFuncHolderExpr &&
@@ -540,13 +538,13 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
         memoryManager.addComplexField(null); // this will just add an estimate to the row width
       } else {
         // need to do evaluation.
-        final ValueVector vector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vector);
+        final ValueVector ouputVector = container.addOrGet(outputField, callBack);
+        allocationVectors.add(ouputVector);
         final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-        final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
         final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
         final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-        memoryManager.addNewField(vector, write);
+        memoryManager.addNewField(ouputVector, write);
 
         // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
         if (expr instanceof ValueVectorReadExpression) {
@@ -555,7 +553,7 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
             final TypedFieldId id = vectorRead.getFieldId();
             final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(),
                     id.getFieldIds()).getValueVector();
-            vvIn.makeTransferPair(vector);
+            vvIn.makeTransferPair(ouputVector);
           }
         }
       }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services