You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/01 20:25:35 UTC

[GitHub] [pinot] walterddr opened a new pull request, #9892: [multistage] Fix leaf stage return

walterddr opened a new pull request, #9892:
URL: https://github.com/apache/pinot/pull/9892

   this is a break-out PR from POC https://github.com/apache/pinot/pull/9886
   
   this PR creates leaf stage operator handling various different base result blocks that doesn't conform with calcite composed V2 leaf stage data schema
   - selection only is implemented
   - the rest is still placeholder
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039902939


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+      return composeColumnIndexedTransferableBlock(responseBlock, adjustedResultSchema, columnIndices);
+    } else {
+      return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+    }
+  }
+
+  /**
+   * Created {@link TransferableBlock} using column indices.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeColumnIndexedTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema, int[] columnIndices) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (responseBlock.getQueryContext().getOrderByExpressions() != null) {
+      // extract result row in ordered fashion
+      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+      while (!priorityQueue.isEmpty()) {
+        extractedRows.add(canonicalizeRow(priorityQueue.poll(), desiredDataSchema, columnIndices));
+      }
+    } else {
+      // extract result row in non-ordered fashion
+      for (Object[] row : resultRows) {
+        extractedRows.add(canonicalizeRow(row, desiredDataSchema, columnIndices));
+      }
+    }
+    return new TransferableBlock(extractedRows, desiredDataSchema, DataBlock.Type.ROW);
+  }
+
+  /**
+   * Fallback mechanism for {@link TransferableBlock}, used when no special handling is necessary. This method only
+   * performs {@link DataSchema.ColumnDataType} canonicalization.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeDirectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (resultRows instanceof List) {
+      for (Object[] orgRow : resultRows) {
+        extractedRows.add(canonicalizeRow(orgRow, desiredDataSchema));
+      }
+    } else if (resultRows instanceof PriorityQueue) {

Review Comment:
   (nit) instead of doing `instanceof` check, may be we should check the presence / absence of `ORDER BY` expressions ? Could be cheaper imo. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039955655


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());

Review Comment:
   good call out. was a refactor residual. they should all be in `composeColumnIndexedTransferableBlock` now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039954181


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression

Review Comment:
   good question. adding a test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039975333


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "

Review Comment:
   unit test is fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9892:
URL: https://github.com/apache/pinot/pull/9892#issuecomment-1336082626

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9892?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9892](https://codecov.io/gh/apache/pinot/pull/9892?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (808b422) into [master](https://codecov.io/gh/apache/pinot/commit/568d858f1654dc35cd07ad28978ead1161bd65d9?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (568d858) will **decrease** coverage by `45.08%`.
   > The diff coverage is `37.94%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9892       +/-   ##
   =============================================
   - Coverage     60.89%   15.81%   -45.09%     
   + Complexity     5376      175     -5201     
   =============================================
     Files          1966     1928       -38     
     Lines        105840   103911     -1929     
     Branches      16059    15832      -227     
   =============================================
   - Hits          64455    16436    -48019     
   - Misses        36617    86277    +49660     
   + Partials       4768     1198     -3570     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `15.81% <37.94%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9892?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/pinot/common/utils/config/TableConfigUtils.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvY29uZmlnL1RhYmxlQ29uZmlnVXRpbHMuamF2YQ==) | `0.00% <0.00%> (-79.76%)` | :arrow_down: |
   | [...ata/manager/offline/DimensionTableDataManager.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvb2ZmbGluZS9EaW1lbnNpb25UYWJsZURhdGFNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (-77.78%)` | :arrow_down: |
   | [...data/manager/offline/FastLookupDimensionTable.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvb2ZmbGluZS9GYXN0TG9va3VwRGltZW5zaW9uVGFibGUuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ore/data/manager/offline/LookupRecordLocation.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvb2ZmbGluZS9Mb29rdXBSZWNvcmRMb2NhdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...manager/offline/MemoryOptimizedDimensionTable.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvb2ZmbGluZS9NZW1vcnlPcHRpbWl6ZWREaW1lbnNpb25UYWJsZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...e/pinot/spi/config/table/DimensionTableConfig.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL0RpbWVuc2lvblRhYmxlQ29uZmlnLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...org/apache/pinot/spi/config/table/TableConfig.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL1RhYmxlQ29uZmlnLmphdmE=) | `0.00% <0.00%> (-65.66%)` | :arrow_down: |
   | [...he/pinot/spi/utils/builder/TableConfigBuilder.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvYnVpbGRlci9UYWJsZUNvbmZpZ0J1aWxkZXIuamF2YQ==) | `0.00% <0.00%> (-87.50%)` | :arrow_down: |
   | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | `86.74% <50.00%> (+0.33%)` | :arrow_up: |
   | [...e/operator/LeafStageTransferableBlockOperator.java](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9MZWFmU3RhZ2VUcmFuc2ZlcmFibGVCbG9ja09wZXJhdG9yLmphdmE=) | `84.61% <85.88%> (+19.61%)` | :arrow_up: |
   | ... and [1664 more](https://codecov.io/gh/apache/pinot/pull/9892/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039933028


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "

Review Comment:
   these are not-expected to occur based on our current ServerStageVisitor. (e.g. the leaf-stage will never generate a query that returns a result not conforming with the asked schema). i can add a unit-test but will not be possible to add an e2e test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039901290


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);

Review Comment:
   I think `selectionColumns` and `columnIndices` need to be retrieved only in the case of out-of-order / adjusted projection. 
   
   So may be we should move 210-212 inside the i`f (!inOrder(columnIndices))` branch



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);

Review Comment:
   I think `selectionColumns` and `columnIndices` need to be retrieved only in the case of out-of-order / adjusted projection. 
   
   So may be we should move 210-212 inside the `if (!inOrder(columnIndices))` branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039933028


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "

Review Comment:
   these are not-expected to occur based on our current ServerStageVisitor. (e.g. the leaf-stage will never generate a query that returns a result not conforming with the asked schema). i can add a unit-test but will not be possible to add an e2e test unless we fake a plan visitor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039897561


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "

Review Comment:
   Do we actually expect this to happen for some legit queries or this is just a sanity check for bugs etc ? Possible to add a test to trigger this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039932018


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);

Review Comment:
   yeah but if we dont have columnIndices, we can't do inOrder check on columnIndices :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039933028


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "

Review Comment:
   these are not-expected to occur based on our current ServerStageVisitor. (e.g. the leaf-stage will never generate a query that returns a result not conforming with the asked schema.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039975067


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);

Review Comment:
   sorry. my bad 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia merged pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
siddharthteotia merged PR #9892:
URL: https://github.com/apache/pinot/pull/9892


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039953945


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());

Review Comment:
   distinct aggregation is not a distinct result block but an agg result block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039922437


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());

Review Comment:
   is it always the case that the first aggregation will be the distinct column? what if I had: `SELECT DISTINCT MAX(foo), bar FROM tbl GROUP BY bar` (or would that not return a DistinctBlok)?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression

Review Comment:
   what happens if you group by an expression? (e.g. `SELECT SUBSTR(foo, 5), SUM(bar) FROM tbl GROUP BY SUBSTR(foo, 5)`)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());

Review Comment:
   are these two used anywhere?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+      return composeColumnIndexedTransferableBlock(responseBlock, adjustedResultSchema, columnIndices);
+    } else {
+      return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+    }
+  }
+
+  /**
+   * Created {@link TransferableBlock} using column indices.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeColumnIndexedTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema, int[] columnIndices) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (responseBlock.getQueryContext().getOrderByExpressions() != null) {
+      // extract result row in ordered fashion
+      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+      while (!priorityQueue.isEmpty()) {
+        extractedRows.add(canonicalizeRow(priorityQueue.poll(), desiredDataSchema, columnIndices));
+      }
+    } else {
+      // extract result row in non-ordered fashion
+      for (Object[] row : resultRows) {
+        extractedRows.add(canonicalizeRow(row, desiredDataSchema, columnIndices));
+      }
+    }
+    return new TransferableBlock(extractedRows, desiredDataSchema, DataBlock.Type.ROW);
+  }
+
+  /**
+   * Fallback mechanism for {@link TransferableBlock}, used when no special handling is necessary. This method only
+   * performs {@link DataSchema.ColumnDataType} canonicalization.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeDirectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (resultRows instanceof List) {
+      for (Object[] orgRow : resultRows) {
+        extractedRows.add(canonicalizeRow(orgRow, desiredDataSchema));
+      }
+    } else if (resultRows instanceof PriorityQueue) {

Review Comment:
   personally I prefer the type safety this gives us over the potential performance benefit of not using instanceof. since this happens once-per-block, and blocks sent from leaves are pretty large, I'm not sure there is a real performance concern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9892: [multistage] Fix leaf stage return

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039932237


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of {@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the desiredDataSchema
+   */
+  private static TransferableBlock composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire list.</li>
+   * </ul>
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock, DataSchema).
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+      return composeColumnIndexedTransferableBlock(responseBlock, adjustedResultSchema, columnIndices);
+    } else {
+      return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+    }
+  }
+
+  /**
+   * Created {@link TransferableBlock} using column indices.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeColumnIndexedTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema, int[] columnIndices) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (responseBlock.getQueryContext().getOrderByExpressions() != null) {
+      // extract result row in ordered fashion
+      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+      while (!priorityQueue.isEmpty()) {
+        extractedRows.add(canonicalizeRow(priorityQueue.poll(), desiredDataSchema, columnIndices));
+      }
+    } else {
+      // extract result row in non-ordered fashion
+      for (Object[] row : resultRows) {
+        extractedRows.add(canonicalizeRow(row, desiredDataSchema, columnIndices));
+      }
+    }
+    return new TransferableBlock(extractedRows, desiredDataSchema, DataBlock.Type.ROW);
+  }
+
+  /**
+   * Fallback mechanism for {@link TransferableBlock}, used when no special handling is necessary. This method only
+   * performs {@link DataSchema.ColumnDataType} canonicalization.
+   *
+   * @see LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock, DataSchema).
+   */
+  private static TransferableBlock composeDirectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (resultRows instanceof List) {
+      for (Object[] orgRow : resultRows) {
+        extractedRows.add(canonicalizeRow(orgRow, desiredDataSchema));
+      }
+    } else if (resultRows instanceof PriorityQueue) {

Review Comment:
   good idea. i will change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org