You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/01/02 19:40:01 UTC

[jira] [Commented] (DRILL-7503) Refactor project operator

    [ https://issues.apache.org/jira/browse/DRILL-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007031#comment-17007031 ] 

ASF GitHub Bot commented on DRILL-7503:
---------------------------------------

ihuzenko commented on pull request #1944: DRILL-7503: Refactor the project operator
URL: https://github.com/apache/drill/pull/1944#discussion_r362500067
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
 ##########
 @@ -42,307 +44,310 @@
 import java.util.Map;
 
 /**
- *
- * ProjectMemoryManager(PMM) is used to estimate the size of rows produced by ProjectRecordBatch.
- * The PMM works as follows:
- *
- * Setup phase: As and when ProjectRecordBatch creates or transfers a field, it registers the field with PMM.
- * If the field is a variable width field, PMM records the expression that produces the variable
- * width field. The expression is a tree of LogicalExpressions. The PMM walks this tree of LogicalExpressions
- * to produce a tree of OutputWidthExpressions. The widths of Fixed width fields are just accumulated into a single
- * total. Note: The PMM, currently, cannot handle new complex fields, it just uses a hard-coded estimate for such fields.
- *
- *
- * Execution phase: Just before a batch is processed by Project, the PMM walks the tree of OutputWidthExpressions
- * and converts them to FixedWidthExpressions. It uses the RecordBatchSizer and the function annotations to do this conversion.
- * See OutputWidthVisitor for details.
+ * ProjectMemoryManager(PMM) is used to estimate the size of rows produced by
+ * ProjectRecordBatch. The PMM works as follows:
+ * <p>
+ * Setup phase: As and when ProjectRecordBatch creates or transfers a field, it
+ * registers the field with PMM. If the field is a variable width field, PMM
+ * records the expression that produces the variable width field. The expression
+ * is a tree of LogicalExpressions. The PMM walks this tree of
+ * LogicalExpressions to produce a tree of OutputWidthExpressions. The widths of
+ * Fixed width fields are just accumulated into a single total. Note: The PMM,
+ * currently, cannot handle new complex fields, it just uses a hard-coded
+ * estimate for such fields.
+ * <p>
+ * Execution phase: Just before a batch is processed by Project, the PMM walks
+ * the tree of OutputWidthExpressions and converts them to
+ * FixedWidthExpressions. It uses the RecordBatchSizer and the function
+ * annotations to do this conversion. See OutputWidthVisitor for details.
  */
 public class ProjectMemoryManager extends RecordBatchMemoryManager {
 
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectMemoryManager.class);
-
-    public RecordBatch getIncomingBatch() {
-        return incomingBatch;
+  static final Logger logger = LoggerFactory.getLogger(ProjectMemoryManager.class);
+
+  private RecordBatch incomingBatch;
+  private ProjectRecordBatch outgoingBatch;
+
+  private int rowWidth;
+  private final Map<String, ColumnWidthInfo> outputColumnSizes;
+  // Number of variable width columns in the batch
+  private int variableWidthColumnCount;
+  // Number of fixed width columns in the batch
+  private int fixedWidthColumnCount;
+  // Number of complex columns in the batch
+  private int complexColumnsCount;
+
+  // Holds sum of all fixed width column widths
+  private int totalFixedWidthColumnWidth;
+  // Holds sum of all complex column widths
+  // Currently, this is just a guess
+  private int totalComplexColumnWidth;
+
+  private enum WidthType {
+      FIXED,
+      VARIABLE
+  }
+
+  public enum OutputColumnType {
+      TRANSFER,
+      NEW
+  }
+
+  public static class ColumnWidthInfo {
+    private final OutputWidthExpression outputExpression;
+    private final int width;
+    private final WidthType widthType;
+    private final OutputColumnType outputColumnType;
+    private final ValueVector outputVV; // for transfers, this is the transfer src
+
+
+    ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
+                    OutputColumnType outputColumnType,
+                    WidthType widthType,
+                    int fieldWidth, ValueVector outputVV) {
+      this.outputExpression = outputWidthExpression;
+      this.width = fieldWidth;
+      this.outputColumnType = outputColumnType;
+      this.widthType = widthType;
+      this.outputVV = outputVV;
     }
 
-    RecordBatch incomingBatch = null;
-    ProjectRecordBatch outgoingBatch = null;
+    public OutputWidthExpression getOutputExpression() { return outputExpression; }
 
-    int rowWidth = 0;
-    Map<String, ColumnWidthInfo> outputColumnSizes;
-    // Number of variable width columns in the batch
-    int variableWidthColumnCount = 0;
-    // Number of fixed width columns in the batch
-    int fixedWidthColumnCount = 0;
-    // Number of complex columns in the batch
-    int complexColumnsCount = 0;
+    public OutputColumnType getOutputColumnType() { return outputColumnType; }
 
+    public boolean isFixedWidth() { return widthType == WidthType.FIXED; }
 
-    // Holds sum of all fixed width column widths
-    int totalFixedWidthColumnWidth = 0;
-    // Holds sum of all complex column widths
-    // Currently, this is just a guess
-    int totalComplexColumnWidth = 0;
-
-    enum WidthType {
-        FIXED,
-        VARIABLE
-    }
-
-    enum OutputColumnType {
-        TRANSFER,
-        NEW
-    }
+    public int getWidth() { return width; }
+  }
 
-    class ColumnWidthInfo {
-        OutputWidthExpression outputExpression;
-        int width;
-        WidthType widthType;
-        OutputColumnType outputColumnType;
-        ValueVector outputVV; // for transfers, this is the transfer src
+  public RecordBatch getIncomingBatch() {
+    return incomingBatch;
+  }
 
+  void ShouldNotReachHere() {
+    throw new IllegalStateException();
+  }
 
-        ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
-                        OutputColumnType outputColumnType,
-                        WidthType widthType,
-                        int fieldWidth, ValueVector outputVV) {
-            this.outputExpression = outputWidthExpression;
-            this.width = fieldWidth;
-            this.outputColumnType = outputColumnType;
-            this.widthType = widthType;
-            this.outputVV = outputVV;
-        }
+  private void setIncomingBatch(RecordBatch recordBatch) {
+    incomingBatch = recordBatch;
+  }
 
-        public OutputWidthExpression getOutputExpression() { return outputExpression; }
+  public RecordBatch incomingBatch() { return incomingBatch; }
 
-        public OutputColumnType getOutputColumnType() { return outputColumnType; }
+  private void setOutgoingBatch(ProjectRecordBatch outgoingBatch) {
+    this.outgoingBatch = outgoingBatch;
+  }
 
-        boolean isFixedWidth() { return widthType == WidthType.FIXED; }
+  public ProjectMemoryManager(int configuredOutputSize) {
+    super(configuredOutputSize);
+    outputColumnSizes = new HashMap<>();
+  }
 
-        public int getWidth() { return width; }
-
-    }
-
-    void ShouldNotReachHere() {
-        throw new IllegalStateException();
-    }
-
-    private void setIncomingBatch(RecordBatch recordBatch) {
-        incomingBatch = recordBatch;
-    }
-
-    private void setOutgoingBatch(ProjectRecordBatch outgoingBatch) {
-        this.outgoingBatch = outgoingBatch;
-    }
+  public boolean isComplex(MajorType majorType) {
+    MinorType minorType = majorType.getMinorType();
+    return minorType == MinorType.MAP || minorType == MinorType.UNION || minorType == MinorType.LIST;
+  }
 
-    public ProjectMemoryManager(int configuredOutputSize) {
-        super(configuredOutputSize);
-        outputColumnSizes = new HashMap<>();
-    }
+  boolean isFixedWidth(TypedFieldId fieldId) {
+    ValueVector vv = getOutgoingValueVector(fieldId);
+    return isFixedWidth(vv);
+  }
 
-    public boolean isComplex(MajorType majorType) {
-        MinorType minorType = majorType.getMinorType();
-        return minorType == MinorType.MAP || minorType == MinorType.UNION || minorType == MinorType.LIST;
-    }
+  public ValueVector getOutgoingValueVector(TypedFieldId fieldId) {
+    Class<?> clazz = fieldId.getIntermediateClass();
+    int[] fieldIds = fieldId.getFieldIds();
+    return outgoingBatch.getValueAccessorById(clazz, fieldIds).getValueVector();
+  }
 
-    boolean isFixedWidth(TypedFieldId fieldId) {
-        ValueVector vv = getOutgoingValueVector(fieldId);
-        return isFixedWidth(vv);
-    }
+  static boolean isFixedWidth(ValueVector vv) {  return (vv instanceof FixedWidthVector); }
 
-    public ValueVector getOutgoingValueVector(TypedFieldId fieldId) {
-        Class<?> clazz = fieldId.getIntermediateClass();
-        int[] fieldIds = fieldId.getFieldIds();
-        return outgoingBatch.getValueAccessorById(clazz, fieldIds).getValueVector();
-    }
 
-    static boolean isFixedWidth(ValueVector vv) {  return (vv instanceof FixedWidthVector); }
+  static int getNetWidthOfFixedWidthType(ValueVector vv) {
+    assert isFixedWidth(vv);
+    return ((FixedWidthVector)vv).getValueWidth();
+  }
 
+  public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
+    MinorType minorType = majorType.getMinorType();
+    final boolean isVariableWidth  = (minorType == MinorType.VARCHAR || minorType == MinorType.VAR16CHAR
+            || minorType == MinorType.VARBINARY);
 
-    static int getNetWidthOfFixedWidthType(ValueVector vv) {
-        assert isFixedWidth(vv);
-        return ((FixedWidthVector)vv).getValueWidth();
+    if (isVariableWidth) {
+      throw new IllegalArgumentException("getWidthOfFixedWidthType() cannot handle variable width types");
     }
 
-    public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
-        MinorType minorType = majorType.getMinorType();
-        final boolean isVariableWidth  = (minorType == MinorType.VARCHAR || minorType == MinorType.VAR16CHAR
-                || minorType == MinorType.VARBINARY);
-
-        if (isVariableWidth) {
-            throw new IllegalArgumentException("getWidthOfFixedWidthType() cannot handle variable width types");
-        }
-
-        if (minorType == MinorType.NULL) {
-            return 0;
-        }
-
-        return TypeHelper.getSize(majorType);
+    if (minorType == MinorType.NULL) {
+      return 0;
     }
 
+    return TypeHelper.getSize(majorType);
+  }
 
-    void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) {
-        addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName);
-    }
 
-    void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
-        addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName());
-    }
+  void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) {
+    addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName);
+  }
 
-    void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType,
-                  String inputColumnName, String outputColumnName) {
-        if(isFixedWidth(vv)) {
-            addFixedWidthField(vv);
-        } else {
-            addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName);
-        }
-    }
+  void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+    addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName());
+  }
 
-    private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression,
-                                       OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
-        variableWidthColumnCount++;
-        ColumnWidthInfo columnWidthInfo;
-        logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}",
-                printVV(vv), variableWidthColumnCount, outputColumnType);
-        //Variable width transfers
-        if(outputColumnType == OutputColumnType.TRANSFER) {
-            VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
-            columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
-                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the RecordBatchSizer
-        } else if (isComplex(vv.getField().getType())) {
-            addComplexField(vv);
-            return;
-        } else {
-            // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions
-            OutputWidthVisitorState state = new OutputWidthVisitorState(this);
-            OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
-            columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
-                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the OutputWidthExpression
-        }
-        ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
-        Preconditions.checkState(existingInfo == null);
+  void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType,
+                String inputColumnName, String outputColumnName) {
+    if(isFixedWidth(vv)) {
+      addFixedWidthField(vv);
+    } else {
+      addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName);
     }
-
-    public static String printVV(ValueVector vv) {
-        String str = "null";
-        if (vv != null) {
-            str = vv.getField().getName() + " " + vv.getField().getType();
-        }
-        return str;
+  }
+
+  private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression,
+                                     OutputColumnType outputColumnType, String inputColumnName,
+                                     String outputColumnName) {
+    variableWidthColumnCount++;
+    ColumnWidthInfo columnWidthInfo;
+    logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}",
+            printVV(vv), variableWidthColumnCount, outputColumnType);
+    // Variable width transfers
+    if (outputColumnType == OutputColumnType.TRANSFER) {
+      VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+      columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
+              WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the RecordBatchSizer
+    } else if (isComplex(vv.getField().getType())) {
+      addComplexField(vv);
+      return;
+    } else {
+      // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions
+      OutputWidthVisitorState state = new OutputWidthVisitorState(this);
+      OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
+      columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
+              WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the OutputWidthExpression
     }
-
-    void addComplexField(ValueVector vv) {
-        //Complex types are not yet supported. Just use a guess for the size
-        assert vv == null || isComplex(vv.getField().getType());
-        complexColumnsCount++;
-        // just a guess
-        totalComplexColumnWidth +=  OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
-        logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
-                printVV(vv), complexColumnsCount, totalComplexColumnWidth);
+    ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
+    Preconditions.checkState(existingInfo == null);
+  }
+
+  public static String printVV(ValueVector vv) {
+    String str = "null";
+    if (vv != null) {
+      str = vv.getField().getName() + " " + vv.getField().getType();
     }
-
-    void addFixedWidthField(ValueVector vv) {
-        assert isFixedWidth(vv);
-        fixedWidthColumnCount++;
-        int fixedFieldWidth = getNetWidthOfFixedWidthType(vv);
-        totalFixedWidthColumnWidth += fixedFieldWidth;
-        logger.trace("addFixedWidthField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
-                printVV(vv), fixedWidthColumnCount, totalFixedWidthColumnWidth);
+    return str;
+  }
+
+  void addComplexField(ValueVector vv) {
+    //Complex types are not yet supported. Just use a guess for the size
+    assert vv == null || isComplex(vv.getField().getType());
+    complexColumnsCount++;
+    // just a guess
+    totalComplexColumnWidth +=  OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
+    logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
+            printVV(vv), complexColumnsCount, totalComplexColumnWidth);
+  }
+
+  void addFixedWidthField(ValueVector vv) {
+    assert isFixedWidth(vv);
+    fixedWidthColumnCount++;
+    int fixedFieldWidth = getNetWidthOfFixedWidthType(vv);
 
 Review comment:
   ```suggestion
       int fixedFieldWidth = ((FixedWidthVector) vv).getValueWidth();
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Refactor project operator
> -------------------------
>
>                 Key: DRILL-7503
>                 URL: https://issues.apache.org/jira/browse/DRILL-7503
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.17.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>             Fix For: 1.18.0
>
>
> Work on another ticket revealed that the Project operator ("record batch") has grown quite complex. The setup phase lives in the operator as one huge function. The function combines the "logical" tasks of working out the projection expressions and types, the code gen for those expressions, and the physical setup of vectors.
> The refactoring breaks up the logic so that it is easier to focus on the specific bits of interest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)