You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 20:55:52 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4301: Use tournament loser tree for k-way sort-merging, increase merge speed by 50%

alamb commented on code in PR #4301:
URL: https://github.com/apache/arrow-datafusion/pull/4301#discussion_r1034040105


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -570,45 +606,57 @@ impl SortPreservingMergeStream {
         let _timer = elapsed_compute.timer();
 
         loop {
-            match self.heap.pop() {
-                Some(Reverse(mut cursor)) => {
-                    let stream_idx = cursor.stream_idx();
-                    let batch_idx = self.batches[stream_idx].len() - 1;
-                    let row_idx = cursor.advance();
-
-                    let mut cursor_finished = false;
-                    // insert the cursor back to heap if the record batch is not exhausted
-                    if !cursor.is_finished() {
-                        self.heap.push(Reverse(cursor));
-                    } else {
-                        cursor_finished = true;
-                        self.cursor_finished[stream_idx] = true;
+            // Adjust the loser tree if necessary
+            if !self.loser_tree_adjusted {
+                let mut winner = self.loser_tree[0];
+                match futures::ready!(self.maybe_poll_stream(cx, winner)) {
+                    Ok(_) => {}
+                    Err(e) => {
+                        self.aborted = true;
+                        return Poll::Ready(Some(Err(e)));
                     }
+                }
 
-                    self.in_progress.push(RowIndex {
-                        stream_idx,
-                        batch_idx,
-                        row_idx,
-                    });
-
-                    if self.in_progress.len() == self.batch_size {
-                        return Poll::Ready(Some(self.build_record_batch()));
+                let mut cmp_node = (num_streams + winner) / 2;
+                while cmp_node != 0 {
+                    let challenger = self.loser_tree[cmp_node];
+                    let challenger_win =
+                        match (&self.cursors[winner], &self.cursors[challenger]) {
+                            (None, _) => true,
+                            (_, None) => false,
+                            (Some(winner), Some(challenger)) => challenger < winner,
+                        };
+                    if challenger_win {
+                        self.loser_tree[cmp_node] = winner;
+                        winner = challenger;
                     }
+                    cmp_node /= 2;
+                }
+                self.loser_tree[0] = winner;
+                self.loser_tree_adjusted = true;
+            }
 
-                    // If removed the last row from the cursor, need to fetch a new record
-                    // batch if possible, before looping round again
-                    if cursor_finished {
-                        match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) {
-                            Ok(_) => {}
-                            Err(e) => {
-                                self.aborted = true;
-                                return Poll::Ready(Some(Err(e)));
-                            }
-                        }
-                    }
+            let min_cursor_idx = self.loser_tree[0];

Review Comment:
   I couldn't make this work in https://github.com/apache/arrow-datafusion/pull/4407



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org