You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/01 01:26:20 UTC

[GitHub] [pinot] walterddr opened a new pull request, #9886: [draft][multistage] Fixing NOT IT and the leaf-stage return type

walterddr opened a new pull request, #9886:
URL: https://github.com/apache/pinot/pull/9886

   Still a draft.
   
   This PR attempt to use LeafStageSendOperator to canonicalize mismatches in type, duplicate columns, and other nuances related to the type and schema differences between V1 and V2 engine


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9886: [draft][multistage] Fixing NOT IT and the leaf-stage return type

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9886:
URL: https://github.com/apache/pinot/pull/9886#discussion_r1037667080


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -116,23 +128,107 @@ private static Object[] canonicalizeRow(Object[] row, DataSchema dataSchema) {
     return resultRow;
   }
 
-  private static List<Object[]> toList(Collection<Object[]> collection, DataSchema dataSchema) {
-    if (collection == null || collection.isEmpty()) {
-      return new ArrayList<>();
-    }
-    List<Object[]> resultRows = new ArrayList<>(collection.size());
-    if (collection instanceof List) {
-      for (Object[] orgRow : collection) {
-        resultRows.add(canonicalizeRow(orgRow, dataSchema));
-      }
-    } else if (collection instanceof PriorityQueue) {
-      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) collection;
-      while (!priorityQueue.isEmpty()) {
-        resultRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+  /**
+   * we re-arrange columns to match the projection in the case of order by - this is to ensure
+   * that V1 results match what the expected projection schema in the calcite logical operator; if
+   * we realize that there are other situations where we need to post-process v1 results to adhere to
+   * the expected results we should factor this out and also apply the canonicalization of the data
+   * types during this post-process step (also see LeafStageTransferableBlockOperator#canonicalizeRow)
+   *
+   * @param serverResultsBlock result block from leaf stage
+   * @param dataSchema the desired schema for send operator
+   * @return conformed collection of rows.
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static List<Object[]> cleanUpDataBlock(InstanceResponseBlock serverResultsBlock, DataSchema dataSchema,
+      boolean requiresCleanUp) {
+    // Extract the result rows
+    Collection<Object[]> resultRows = serverResultsBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (requiresCleanUp) {
+      DataSchema resultSchema = serverResultsBlock.getDataSchema();
+      List<String> selectionColumns =
+          SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(), resultSchema);
+      int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+      DataSchema adjustedDataSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(dataSchema.getColumnDataTypes(),
+              adjustedDataSchema.getColumnDataTypes()),
+          "Incompatible result data schema: " + "Expecting: " + dataSchema + " Actual: " + adjustedDataSchema);
+      int numColumns = columnIndices.length;
+
+      if (serverResultsBlock.getQueryContext().getOrderByExpressions() != null) {
+        // extract result row in ordered fashion
+        PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+        while (!priorityQueue.isEmpty()) {
+          Object[] row = priorityQueue.poll();
+          assert row != null;
+          Object[] extractedRow = new Object[numColumns];
+          for (int colId = 0; colId < numColumns; colId++) {
+            Object value = row[columnIndices[colId]];
+            if (value != null) {
+              extractedRow[colId] = dataSchema.getColumnDataType(colId).convert(value);
+            }
+          }
+          extractedRows.add(extractedRow);
+        }
+      } else {
+        // extract result row in non-ordered fashion
+        for (Object[] row : resultRows) {
+          assert row != null;
+          Object[] extractedRow = new Object[numColumns];
+          for (int colId = 0; colId < numColumns; colId++) {
+            Object value = row[columnIndices[colId]];
+            if (value != null) {
+              extractedRow[colId] = dataSchema.getColumnDataType(colId).convert(value);
+            }
+          }
+          extractedRows.add(extractedRow);
+        }
       }
     } else {
-      throw new UnsupportedOperationException("Unsupported collection type: " + collection.getClass());
+      if (resultRows instanceof List) {
+        for (Object[] orgRow : resultRows) {
+          extractedRows.add(canonicalizeRow(orgRow, dataSchema));
+        }
+      } else if (resultRows instanceof PriorityQueue) {
+        PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+        while (!priorityQueue.isEmpty()) {
+          extractedRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+        }
+      } else {
+        throw new UnsupportedOperationException("Unsupported collection type: " + resultRows.getClass());
+      }
+    }
+    return extractedRows;
+  }
+
+  /**
+   * @see LeafStageTransferableBlockOperator#cleanUpDataBlock(InstanceResponseBlock, DataSchema, boolean)
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static DataSchema cleanUpDataSchema(InstanceResponseBlock serverResultsBlock, DataSchema desiredDataSchema) {
+    DataSchema resultSchema = serverResultsBlock.getDataSchema();
+    List<String> selectionColumns =
+        SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(), resultSchema);
+
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+    Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+            adjustedResultSchema.getColumnDataTypes()),
+        "Incompatible result data schema: " + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+    return adjustedResultSchema;
+  }
+
+  private static boolean isDataSchemaColumnTypesCompatible(DataSchema.ColumnDataType[] desiredTypes,
+      DataSchema.ColumnDataType[] givenTypes) {
+    if (desiredTypes.length != givenTypes.length) {
+      return false;
+    }
+    for (int i = 0; i < desiredTypes.length; i++) {
+      if (desiredTypes[i] != givenTypes[i] && !givenTypes[i].isSuperTypeOf(desiredTypes[i])) {

Review Comment:
   it is returning true when types are equal i believe, see `DataSchemaTest#testSuperTypeCheckers`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -84,11 +89,18 @@ protected TransferableBlock getNextBlock() {
     } else {
       if (_currentIndex < _baseResultBlock.size()) {
         InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
-        BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
-        if (resultsBlock != null) {
-          List<Object[]> rows =
-              toList(resultsBlock.getRows(responseBlock.getQueryContext()), responseBlock.getDataSchema());
-          return new TransferableBlock(rows, responseBlock.getDataSchema(), DataBlock.Type.ROW);
+        if (responseBlock.getResultsBlock() != null) {
+          DataSchema dataSchema = responseBlock.getDataSchema();
+          boolean requiresCleanup = responseBlock.getResultsBlock() instanceof SelectionResultsBlock

Review Comment:
   yes agreed with you, this is refactor out to one per baseresulttype



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9886: [draft][multistage] Fixing NOT IT and the leaf-stage return type

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9886:
URL: https://github.com/apache/pinot/pull/9886#discussion_r1037667711


##########
pinot-query-runtime/src/test/resources/queries/TableExpressions.json:
##########
@@ -21,6 +21,9 @@
       { "sql": "SELECT * FROM {tbl} WHERE intCol IN (196883, 42)" },
       { "sql": "SELECT * FROM {tbl} WHERE intCol NOT IN (196883, 42) AND strCol IN ('alice')" },
       { "sql": "SELECT * FROM {tbl} WHERE strCol IN (SELECT strCol FROM {tbl} WHERE intCol > 100)" },
+      { "sql": "SELECT * FROM {tbl} WHERE strCol IN (SELECT strCol FROM {tbl} WHERE intCol < 100)" },
+      { "sql": "SELECT * FROM {tbl} WHERE strCol NOT IN (SELECT strCol FROM {tbl} WHERE intCol > 100)" },

Review Comment:
   will test that, but actually `NOT IN` failed on a different issue than the leaf stage, so will break out that in separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr closed pull request #9886: [draft][multistage] Fixing NOT IN and the leaf-stage return type

Posted by GitBox <gi...@apache.org>.
walterddr closed pull request #9886: [draft][multistage] Fixing NOT IN and the leaf-stage return type
URL: https://github.com/apache/pinot/pull/9886


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on pull request #9886: [draft][multistage] Fixing NOT IN and the leaf-stage return type

Posted by GitBox <gi...@apache.org>.
walterddr commented on PR #9886:
URL: https://github.com/apache/pinot/pull/9886#issuecomment-1361983276

   closing. this is done via #9892 and #10020 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9886: [draft][multistage] Fixing NOT IT and the leaf-stage return type

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9886:
URL: https://github.com/apache/pinot/pull/9886#discussion_r1037504232


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -84,11 +89,18 @@ protected TransferableBlock getNextBlock() {
     } else {
       if (_currentIndex < _baseResultBlock.size()) {
         InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
-        BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
-        if (resultsBlock != null) {
-          List<Object[]> rows =
-              toList(resultsBlock.getRows(responseBlock.getQueryContext()), responseBlock.getDataSchema());
-          return new TransferableBlock(rows, responseBlock.getDataSchema(), DataBlock.Type.ROW);
+        if (responseBlock.getResultsBlock() != null) {
+          DataSchema dataSchema = responseBlock.getDataSchema();
+          boolean requiresCleanup = responseBlock.getResultsBlock() instanceof SelectionResultsBlock

Review Comment:
   nit can we break this out into its own method? will be a bit easier if we need to set debugger breakpoints and what not



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -116,23 +128,107 @@ private static Object[] canonicalizeRow(Object[] row, DataSchema dataSchema) {
     return resultRow;
   }
 
-  private static List<Object[]> toList(Collection<Object[]> collection, DataSchema dataSchema) {
-    if (collection == null || collection.isEmpty()) {
-      return new ArrayList<>();
-    }
-    List<Object[]> resultRows = new ArrayList<>(collection.size());
-    if (collection instanceof List) {
-      for (Object[] orgRow : collection) {
-        resultRows.add(canonicalizeRow(orgRow, dataSchema));
-      }
-    } else if (collection instanceof PriorityQueue) {
-      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) collection;
-      while (!priorityQueue.isEmpty()) {
-        resultRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+  /**
+   * we re-arrange columns to match the projection in the case of order by - this is to ensure
+   * that V1 results match what the expected projection schema in the calcite logical operator; if
+   * we realize that there are other situations where we need to post-process v1 results to adhere to
+   * the expected results we should factor this out and also apply the canonicalization of the data
+   * types during this post-process step (also see LeafStageTransferableBlockOperator#canonicalizeRow)
+   *
+   * @param serverResultsBlock result block from leaf stage
+   * @param dataSchema the desired schema for send operator
+   * @return conformed collection of rows.
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static List<Object[]> cleanUpDataBlock(InstanceResponseBlock serverResultsBlock, DataSchema dataSchema,
+      boolean requiresCleanUp) {
+    // Extract the result rows
+    Collection<Object[]> resultRows = serverResultsBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (requiresCleanUp) {
+      DataSchema resultSchema = serverResultsBlock.getDataSchema();
+      List<String> selectionColumns =
+          SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(), resultSchema);
+      int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+      DataSchema adjustedDataSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      Preconditions.checkState(isDataSchemaColumnTypesCompatible(dataSchema.getColumnDataTypes(),
+              adjustedDataSchema.getColumnDataTypes()),
+          "Incompatible result data schema: " + "Expecting: " + dataSchema + " Actual: " + adjustedDataSchema);
+      int numColumns = columnIndices.length;
+
+      if (serverResultsBlock.getQueryContext().getOrderByExpressions() != null) {
+        // extract result row in ordered fashion
+        PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+        while (!priorityQueue.isEmpty()) {
+          Object[] row = priorityQueue.poll();
+          assert row != null;
+          Object[] extractedRow = new Object[numColumns];
+          for (int colId = 0; colId < numColumns; colId++) {
+            Object value = row[columnIndices[colId]];
+            if (value != null) {
+              extractedRow[colId] = dataSchema.getColumnDataType(colId).convert(value);
+            }
+          }
+          extractedRows.add(extractedRow);
+        }
+      } else {
+        // extract result row in non-ordered fashion
+        for (Object[] row : resultRows) {
+          assert row != null;
+          Object[] extractedRow = new Object[numColumns];
+          for (int colId = 0; colId < numColumns; colId++) {
+            Object value = row[columnIndices[colId]];
+            if (value != null) {
+              extractedRow[colId] = dataSchema.getColumnDataType(colId).convert(value);
+            }
+          }
+          extractedRows.add(extractedRow);
+        }
       }
     } else {
-      throw new UnsupportedOperationException("Unsupported collection type: " + collection.getClass());
+      if (resultRows instanceof List) {
+        for (Object[] orgRow : resultRows) {
+          extractedRows.add(canonicalizeRow(orgRow, dataSchema));
+        }
+      } else if (resultRows instanceof PriorityQueue) {
+        PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
+        while (!priorityQueue.isEmpty()) {
+          extractedRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+        }
+      } else {
+        throw new UnsupportedOperationException("Unsupported collection type: " + resultRows.getClass());
+      }
+    }
+    return extractedRows;
+  }
+
+  /**
+   * @see LeafStageTransferableBlockOperator#cleanUpDataBlock(InstanceResponseBlock, DataSchema, boolean)
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static DataSchema cleanUpDataSchema(InstanceResponseBlock serverResultsBlock, DataSchema desiredDataSchema) {
+    DataSchema resultSchema = serverResultsBlock.getDataSchema();
+    List<String> selectionColumns =
+        SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(), resultSchema);
+
+    int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+    Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+            adjustedResultSchema.getColumnDataTypes()),
+        "Incompatible result data schema: " + "Expecting: " + desiredDataSchema + " Actual: " + adjustedResultSchema);
+    return adjustedResultSchema;
+  }
+
+  private static boolean isDataSchemaColumnTypesCompatible(DataSchema.ColumnDataType[] desiredTypes,
+      DataSchema.ColumnDataType[] givenTypes) {
+    if (desiredTypes.length != givenTypes.length) {
+      return false;
+    }
+    for (int i = 0; i < desiredTypes.length; i++) {
+      if (desiredTypes[i] != givenTypes[i] && !givenTypes[i].isSuperTypeOf(desiredTypes[i])) {

Review Comment:
   should we update `isSuperTypeOf` to return true if the types are equal?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -116,23 +128,107 @@ private static Object[] canonicalizeRow(Object[] row, DataSchema dataSchema) {
     return resultRow;
   }
 
-  private static List<Object[]> toList(Collection<Object[]> collection, DataSchema dataSchema) {
-    if (collection == null || collection.isEmpty()) {
-      return new ArrayList<>();
-    }
-    List<Object[]> resultRows = new ArrayList<>(collection.size());
-    if (collection instanceof List) {
-      for (Object[] orgRow : collection) {
-        resultRows.add(canonicalizeRow(orgRow, dataSchema));
-      }
-    } else if (collection instanceof PriorityQueue) {
-      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) collection;
-      while (!priorityQueue.isEmpty()) {
-        resultRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+  /**
+   * we re-arrange columns to match the projection in the case of order by - this is to ensure
+   * that V1 results match what the expected projection schema in the calcite logical operator; if
+   * we realize that there are other situations where we need to post-process v1 results to adhere to
+   * the expected results we should factor this out and also apply the canonicalization of the data
+   * types during this post-process step (also see LeafStageTransferableBlockOperator#canonicalizeRow)
+   *
+   * @param serverResultsBlock result block from leaf stage
+   * @param dataSchema the desired schema for send operator
+   * @return conformed collection of rows.
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static List<Object[]> cleanUpDataBlock(InstanceResponseBlock serverResultsBlock, DataSchema dataSchema,
+      boolean requiresCleanUp) {
+    // Extract the result rows
+    Collection<Object[]> resultRows = serverResultsBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (requiresCleanUp) {
+      DataSchema resultSchema = serverResultsBlock.getDataSchema();
+      List<String> selectionColumns =
+          SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(), resultSchema);
+      int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);

Review Comment:
   this is certainly premature optimization, but just dropping a note here: do we need to do this on each block? I assume that the result of this computation will be the same for all blocks? let's not address this in the PR but something to think about



##########
pinot-query-runtime/src/test/resources/queries/TableExpressions.json:
##########
@@ -21,6 +21,9 @@
       { "sql": "SELECT * FROM {tbl} WHERE intCol IN (196883, 42)" },
       { "sql": "SELECT * FROM {tbl} WHERE intCol NOT IN (196883, 42) AND strCol IN ('alice')" },
       { "sql": "SELECT * FROM {tbl} WHERE strCol IN (SELECT strCol FROM {tbl} WHERE intCol > 100)" },
+      { "sql": "SELECT * FROM {tbl} WHERE strCol IN (SELECT strCol FROM {tbl} WHERE intCol < 100)" },
+      { "sql": "SELECT * FROM {tbl} WHERE strCol NOT IN (SELECT strCol FROM {tbl} WHERE intCol > 100)" },

Review Comment:
   can we test `NOT IN` with various different types?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9886: [draft][multistage] Fixing NOT IT and the leaf-stage return type

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9886:
URL: https://github.com/apache/pinot/pull/9886#discussion_r1037667479


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -116,23 +128,107 @@ private static Object[] canonicalizeRow(Object[] row, DataSchema dataSchema) {
     return resultRow;
   }
 
-  private static List<Object[]> toList(Collection<Object[]> collection, DataSchema dataSchema) {
-    if (collection == null || collection.isEmpty()) {
-      return new ArrayList<>();
-    }
-    List<Object[]> resultRows = new ArrayList<>(collection.size());
-    if (collection instanceof List) {
-      for (Object[] orgRow : collection) {
-        resultRows.add(canonicalizeRow(orgRow, dataSchema));
-      }
-    } else if (collection instanceof PriorityQueue) {
-      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) collection;
-      while (!priorityQueue.isEmpty()) {
-        resultRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+  /**
+   * we re-arrange columns to match the projection in the case of order by - this is to ensure
+   * that V1 results match what the expected projection schema in the calcite logical operator; if
+   * we realize that there are other situations where we need to post-process v1 results to adhere to
+   * the expected results we should factor this out and also apply the canonicalization of the data
+   * types during this post-process step (also see LeafStageTransferableBlockOperator#canonicalizeRow)
+   *
+   * @param serverResultsBlock result block from leaf stage
+   * @param dataSchema the desired schema for send operator
+   * @return conformed collection of rows.
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static List<Object[]> cleanUpDataBlock(InstanceResponseBlock serverResultsBlock, DataSchema dataSchema,
+      boolean requiresCleanUp) {
+    // Extract the result rows
+    Collection<Object[]> resultRows = serverResultsBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (requiresCleanUp) {
+      DataSchema resultSchema = serverResultsBlock.getDataSchema();
+      List<String> selectionColumns =
+          SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(), resultSchema);
+      int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);

Review Comment:
   since leaf stage is returning one block per server (or 2 in hybrid table case) i won't worry about it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org