You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 21:03:58 UTC

[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

tustvold commented on code in PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#discussion_r1034046414


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -660,6 +609,112 @@ impl SortPreservingMergeStream {
             }
         }
     }
+
+    /// Attempts to initialize the loser tree with one value from each
+    /// non exhausted input, if possible.
+    ///
+    /// Returns None on success, or Some(poll) if any of the inputs
+    /// are not ready or errored
+    #[inline]
+    fn init_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        let num_streams = self.streams.num_streams();
+
+        if !self.loser_tree.is_empty() {
+            return TreeUpdate::Complete;
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..num_streams {
+            match self.maybe_poll_stream(cx, i) {
+                Poll::Ready(Ok(_)) => {}
+                Poll::Ready(Err(e)) => {
+                    self.aborted = true;
+                    return TreeUpdate::Incomplete(Poll::Ready(Some(Err(e))));
+                }
+                Poll::Pending => return TreeUpdate::Incomplete(Poll::Pending),
+            }
+        }
+
+        // Init loser tree
+        self.loser_tree.resize(num_streams, usize::MAX);
+        for i in 0..num_streams {
+            let mut winner = i;
+            let mut cmp_node = (num_streams + i) / 2;
+            while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX {
+                let challenger = self.loser_tree[cmp_node];
+                let challenger_win =
+                    match (&self.cursors[winner], &self.cursors[challenger]) {
+                        (None, _) => true,
+                        (_, None) => false,
+                        (Some(winner), Some(challenger)) => challenger < winner,
+                    };
+
+                if challenger_win {
+                    self.loser_tree[cmp_node] = winner;
+                    winner = challenger;
+                }
+
+                cmp_node /= 2;
+            }
+            self.loser_tree[cmp_node] = winner;
+        }
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+
+    /// Attempts to updated the loser tree, if possible
+    ///
+    /// Returns None on success, or Some(poll) if the winning input
+    /// was not ready or errored
+    #[inline]
+    fn update_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        if self.loser_tree_adjusted {
+            return TreeUpdate::Complete;
+        }
+
+        let num_streams = self.streams.num_streams();
+        let mut winner = self.loser_tree[0];
+        match self.maybe_poll_stream(cx, winner) {
+            Poll::Ready(Ok(_)) => {}
+            Poll::Ready(Err(e)) => {
+                self.aborted = true;
+                return TreeUpdate::Incomplete(Poll::Ready(Some(Err(e))));
+            }
+            Poll::Pending => return TreeUpdate::Incomplete(Poll::Pending),
+        }
+
+        // Replace overall winner by walking tree of losers
+        let mut cmp_node = (num_streams + winner) / 2;
+        while cmp_node != 0 {
+            let challenger = self.loser_tree[cmp_node];
+            let challenger_win = match (&self.cursors[winner], &self.cursors[challenger])
+            {
+                (None, _) => true,
+                (_, None) => false,
+                (Some(winner), Some(challenger)) => challenger < winner,
+            };
+            if challenger_win {
+                self.loser_tree[cmp_node] = winner;
+                winner = challenger;
+            }
+            cmp_node /= 2;
+        }
+        self.loser_tree[0] = winner;
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+}
+
+/// The result of updating the loser tree. It is the same as an Option
+/// but with specific names for easier readability
+enum TreeUpdate {
+    /// The tree update could not be completed (e.g. the input was not
+    /// ready or had an error). The caller should return the `Poll`
+    /// result to its caller
+    Incomplete(Poll<Option<ArrowResult<RecordBatch>>>),

Review Comment:
   ```suggestion
       Pending,
       
       Error(ArrowError),
   ```
   
   Given we never seem to return `TreeUpdate::Incomplete(Poll::Ready(None))` or `TreeUpdate::Incomplete(Poll::Ready(Some(Ok(_))))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org