You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/03/09 19:13:50 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10357: Enhance select order-by combine to use merge sort

Jackie-Jiang commented on code in PR #10357:
URL: https://github.com/apache/pinot/pull/10357#discussion_r1131485629


##########
pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java:
##########
@@ -192,37 +192,81 @@ public static DataSchema getResultTableDataSchema(DataSchema dataSchema, List<St
   /**
    * Merge two partial results for selection queries without <code>ORDER BY</code>. (Server side)
    *
-   * @param mergedRows partial results 1.
-   * @param rowsToMerge partial results 2.
+   * @param mergedBlock partial results 1.
+   * @param blockToMerge partial results 2.
    * @param selectionSize size of the selection.
    */
-  public static void mergeWithoutOrdering(Collection<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
+  public static void mergeWithoutOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge,
       int selectionSize) {
-    Iterator<Object[]> iterator = rowsToMerge.iterator();
-    int numMergedRows = 0;
-    while (mergedRows.size() < selectionSize && iterator.hasNext()) {
-      mergedRows.add(iterator.next());
-      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows);
-      numMergedRows++;
+    List<Object[]> mergedRows = mergedBlock.getRows();
+    List<Object[]> rowsToMerge = blockToMerge.getRows();
+    int numRowsToMerge = Math.min(selectionSize - mergedRows.size(), rowsToMerge.size());
+    if (numRowsToMerge > 0) {
+      mergedRows.addAll(rowsToMerge.subList(0, numRowsToMerge));
     }
   }
 
   /**
    * Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side)
-   * TODO: Should use type compatible comparator to compare the rows
    *
-   * @param mergedRows partial results 1.
-   * @param rowsToMerge partial results 2.
+   * @param mergedBlock partial results 1 (sorted).
+   * @param blockToMerge partial results 2 (sorted).
    * @param maxNumRows maximum number of rows need to be stored.
    */
-  public static void mergeWithOrdering(PriorityQueue<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
+  public static void mergeWithOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge,
       int maxNumRows) {
+    List<Object[]> sortedRows1 = mergedBlock.getRows();
+    List<Object[]> sortedRows2 = blockToMerge.getRows();
+    Comparator<? super Object[]> comparator = mergedBlock.getComparator();
+    assert comparator != null;
+    int numSortedRows1 = sortedRows1.size();
+    int numSortedRows2 = sortedRows2.size();
+    if (numSortedRows1 == 0) {
+      mergedBlock.setRows(sortedRows2);
+      return;
+    }
+    if (numSortedRows2 == 0 || (numSortedRows1 == maxNumRows
+        && comparator.compare(sortedRows1.get(numSortedRows1 - 1), sortedRows2.get(0)) <= 0)) {
+      return;
+    }
+    int numRowsToMerge = Math.min(numSortedRows1 + numSortedRows2, maxNumRows);
+    List<Object[]> mergedRows = new ArrayList<>(numRowsToMerge);
+    int i1 = 0;
+    int i2 = 0;
     int numMergedRows = 0;
-    for (Object[] row : rowsToMerge) {
-      addToPriorityQueue(row, mergedRows, maxNumRows);
-      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows);
-      numMergedRows++;
+    while (i1 < numSortedRows1 && i2 < numSortedRows2 && numMergedRows < numRowsToMerge) {
+      Object[] row1 = sortedRows1.get(i1);
+      Object[] row2 = sortedRows2.get(i2);
+      int compareResult = comparator.compare(row1, row2);
+      if (compareResult < 0) {
+        mergedRows.add(row1);
+        i1++;
+        Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows++);

Review Comment:
   We cannot do that because the last branch might merge 2 rows, and the inner if checks the updated `numMergedRows`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org