You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/05 18:21:05 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039932018


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);

Review Comment:
   yeah but if we dont have columnIndices, we can't do inOrder check on columnIndices :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org