You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/05 18:24:10 UTC

[GitHub] [pinot] agavra commented on a diff in pull request #9892: [multistage] Fix leaf stage return

agavra commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039922437


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());

Review Comment:
   is it always the case that the first aggregation will be the distinct column? what if I had: `SELECT DISTINCT MAX(foo), bar FROM tbl GROUP BY bar` (or would that not return a DistinctBlok)?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression

Review Comment:
   what happens if you group by an expression? (e.g. `SELECT SUBSTR(foo, 5), SUM(bar) FROM tbl GROUP BY SUBSTR(foo, 5)`)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());

Review Comment:
   are these two used anywhere?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+      return composeColumnIndexedTransferableBlock(responseBlock, adjustedResultSchema, columnIndices);
+    } else {
+      return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+    }
+  }
+
+  /**
+   * Created {@link TransferableBlock} using column indices.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeColumnIndexedTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema, int[] columnIndices) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (responseBlock.getQueryContext().getOrderByExpressions() != null) {
+      // extract result row in ordered fashion
+      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+      while (!priorityQueue.isEmpty()) {
+        extractedRows.add(canonicalizeRow(priorityQueue.poll(), desiredDataSchema, columnIndices));
+      }
+    } else {
+      // extract result row in non-ordered fashion
+      for (Object[] row : resultRows) {
+        extractedRows.add(canonicalizeRow(row, desiredDataSchema, columnIndices));
+      }
+    }
+    return new TransferableBlock(extractedRows, desiredDataSchema, DataBlock.Type.ROW);
+  }
+
+  /**
+   * Fallback mechanism for {@link TransferableBlock}, used when no special handling is necessary. This method only
+   * performs {@link DataSchema.ColumnDataType} canonicalization.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeDirectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (resultRows instanceof List) {
+      for (Object[] orgRow : resultRows) {
+        extractedRows.add(canonicalizeRow(orgRow, desiredDataSchema));
+      }
+    } else if (resultRows instanceof PriorityQueue) {

Review Comment:
   personally I prefer the type safety this gives us over the potential performance benefit of not using instanceof. since this happens once-per-block, and blocks sent from leaves are pretty large, I'm not sure there is a real performance concern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org