You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 20:46:30 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request, #4407: Refactor loser tree code in SortPreservingMerge per PR comments

alamb opened a new pull request, #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407

   # Which issue does this PR close?
   
   re https://github.com/apache/arrow-datafusion/issues/4300.
   
   # Rationale for this change
   1. I wanted to get the merge speed improvements into DataFusion
   2. I wanted an excuse to work on the code myself for a bit
   
   # What changes are included in this PR?
   
   Implements suggestions from @tustvold  and @viirya  in https://github.com/apache/arrow-datafusion/pull/4301 
   
   # Are these changes tested?
   
   covered by existing tests
   
   TODO: need to run benchmarks
   
   # Are there any user-facing changes?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#issuecomment-1381074814

   Benchmark runs are scheduled for baseline = 0d27fcb04b71693adf570346507ca6282f8cba71 and contender = 82bbaa3dd25a0b174764946be2cfd94b8eda0a68. 82bbaa3dd25a0b174764946be2cfd94b8eda0a68 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/6e5705d84121408a834ce15dc5ec2c3a...bcadee805b2c4bc19104741bc782ef07/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/3d9c163e784d4afaa4a22a5b7e8e1a9f...d44d6d8b7ea84a61ad7116d0c8b1137e/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/eafcb6bd637048f2b8ff8def049b4983...416f058bb8cb48578e7110f15b4ec32c/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/278e3ae05ed1432faabde90d4126199c...813de48f2192413582a2f724326fb2d2/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#discussion_r1034046414


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -660,6 +609,112 @@ impl SortPreservingMergeStream {
             }
         }
     }
+
+    /// Attempts to initialize the loser tree with one value from each
+    /// non exhausted input, if possible.
+    ///
+    /// Returns None on success, or Some(poll) if any of the inputs
+    /// are not ready or errored
+    #[inline]
+    fn init_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        let num_streams = self.streams.num_streams();
+
+        if !self.loser_tree.is_empty() {
+            return TreeUpdate::Complete;
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..num_streams {
+            match self.maybe_poll_stream(cx, i) {
+                Poll::Ready(Ok(_)) => {}
+                Poll::Ready(Err(e)) => {
+                    self.aborted = true;
+                    return TreeUpdate::Incomplete(Poll::Ready(Some(Err(e))));
+                }
+                Poll::Pending => return TreeUpdate::Incomplete(Poll::Pending),
+            }
+        }
+
+        // Init loser tree
+        self.loser_tree.resize(num_streams, usize::MAX);
+        for i in 0..num_streams {
+            let mut winner = i;
+            let mut cmp_node = (num_streams + i) / 2;
+            while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX {
+                let challenger = self.loser_tree[cmp_node];
+                let challenger_win =
+                    match (&self.cursors[winner], &self.cursors[challenger]) {
+                        (None, _) => true,
+                        (_, None) => false,
+                        (Some(winner), Some(challenger)) => challenger < winner,
+                    };
+
+                if challenger_win {
+                    self.loser_tree[cmp_node] = winner;
+                    winner = challenger;
+                }
+
+                cmp_node /= 2;
+            }
+            self.loser_tree[cmp_node] = winner;
+        }
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+
+    /// Attempts to updated the loser tree, if possible
+    ///
+    /// Returns None on success, or Some(poll) if the winning input
+    /// was not ready or errored
+    #[inline]
+    fn update_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        if self.loser_tree_adjusted {
+            return TreeUpdate::Complete;
+        }
+
+        let num_streams = self.streams.num_streams();
+        let mut winner = self.loser_tree[0];
+        match self.maybe_poll_stream(cx, winner) {
+            Poll::Ready(Ok(_)) => {}
+            Poll::Ready(Err(e)) => {
+                self.aborted = true;
+                return TreeUpdate::Incomplete(Poll::Ready(Some(Err(e))));
+            }
+            Poll::Pending => return TreeUpdate::Incomplete(Poll::Pending),
+        }
+
+        // Replace overall winner by walking tree of losers
+        let mut cmp_node = (num_streams + winner) / 2;
+        while cmp_node != 0 {
+            let challenger = self.loser_tree[cmp_node];
+            let challenger_win = match (&self.cursors[winner], &self.cursors[challenger])
+            {
+                (None, _) => true,
+                (_, None) => false,
+                (Some(winner), Some(challenger)) => challenger < winner,
+            };
+            if challenger_win {
+                self.loser_tree[cmp_node] = winner;
+                winner = challenger;
+            }
+            cmp_node /= 2;
+        }
+        self.loser_tree[0] = winner;
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+}
+
+/// The result of updating the loser tree. It is the same as an Option
+/// but with specific names for easier readability
+enum TreeUpdate {
+    /// The tree update could not be completed (e.g. the input was not
+    /// ready or had an error). The caller should return the `Poll`
+    /// result to its caller
+    Incomplete(Poll<Option<ArrowResult<RecordBatch>>>),

Review Comment:
   ```suggestion
       Pending,
       
       Error(ArrowError),
   ```
   
   Given we never seem to return `TreeUpdate::Incomplete(Poll::Ready(None))` or `TreeUpdate::Incomplete(Poll::Ready(Some(Ok(_))))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#discussion_r1067925771


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -660,6 +609,127 @@ impl SortPreservingMergeStream {
             }
         }
     }
+
+    /// Attempts to initialize the loser tree with one value from each
+    /// non exhausted input, if possible.
+    ///
+    /// Returns None on success, or Some(poll) if any of the inputs
+    /// are not ready or errored
+    #[inline]
+    fn init_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        let num_streams = self.streams.num_streams();
+
+        if !self.loser_tree.is_empty() {
+            return TreeUpdate::Complete;
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..num_streams {
+            match self.maybe_poll_stream(cx, i) {
+                Poll::Ready(Ok(_)) => {}
+                Poll::Ready(Err(e)) => {
+                    self.aborted = true;
+                    return TreeUpdate::Error(e);
+                }
+                Poll::Pending => return TreeUpdate::Pending,
+            }
+        }
+
+        // Init loser tree
+        self.loser_tree.resize(num_streams, usize::MAX);
+        for i in 0..num_streams {
+            let mut winner = i;
+            let mut cmp_node = (num_streams + i) / 2;
+            while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX {
+                let challenger = self.loser_tree[cmp_node];
+                let challenger_win =
+                    match (&self.cursors[winner], &self.cursors[challenger]) {
+                        (None, _) => true,
+                        (_, None) => false,
+                        (Some(winner), Some(challenger)) => challenger < winner,
+                    };
+
+                if challenger_win {
+                    self.loser_tree[cmp_node] = winner;
+                    winner = challenger;
+                }
+
+                cmp_node /= 2;
+            }
+            self.loser_tree[cmp_node] = winner;
+        }
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+
+    /// Attempts to updated the loser tree, if possible
+    ///
+    /// Returns None on success, or Some(poll) if the winning input
+    /// was not ready or errored
+    #[inline]
+    fn update_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        if self.loser_tree_adjusted {
+            return TreeUpdate::Complete;
+        }
+
+        let num_streams = self.streams.num_streams();
+        let mut winner = self.loser_tree[0];
+        match self.maybe_poll_stream(cx, winner) {
+            Poll::Ready(Ok(_)) => {}
+            Poll::Ready(Err(e)) => {
+                self.aborted = true;
+                return TreeUpdate::Error(e);
+            }
+            Poll::Pending => return TreeUpdate::Pending,
+        }
+
+        // Replace overall winner by walking tree of losers
+        let mut cmp_node = (num_streams + winner) / 2;
+        while cmp_node != 0 {
+            let challenger = self.loser_tree[cmp_node];
+            let challenger_win = match (&self.cursors[winner], &self.cursors[challenger])
+            {
+                (None, _) => true,
+                (_, None) => false,
+                (Some(winner), Some(challenger)) => challenger < winner,
+            };
+            if challenger_win {
+                self.loser_tree[cmp_node] = winner;
+                winner = challenger;
+            }
+            cmp_node /= 2;
+        }
+        self.loser_tree[0] = winner;
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+}
+
+/// The result of updating the loser tree. It is the same as an Option
+/// but with specific names for easier readability
+enum TreeUpdate {

Review Comment:
   I tried it out and you are right -- I think `Poll` made the code better -- in 2237abed8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#discussion_r1034055248


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -660,6 +609,112 @@ impl SortPreservingMergeStream {
             }
         }
     }
+
+    /// Attempts to initialize the loser tree with one value from each
+    /// non exhausted input, if possible.
+    ///
+    /// Returns None on success, or Some(poll) if any of the inputs
+    /// are not ready or errored
+    #[inline]
+    fn init_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        let num_streams = self.streams.num_streams();
+
+        if !self.loser_tree.is_empty() {
+            return TreeUpdate::Complete;
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..num_streams {
+            match self.maybe_poll_stream(cx, i) {
+                Poll::Ready(Ok(_)) => {}
+                Poll::Ready(Err(e)) => {
+                    self.aborted = true;
+                    return TreeUpdate::Incomplete(Poll::Ready(Some(Err(e))));
+                }
+                Poll::Pending => return TreeUpdate::Incomplete(Poll::Pending),
+            }
+        }
+
+        // Init loser tree
+        self.loser_tree.resize(num_streams, usize::MAX);
+        for i in 0..num_streams {
+            let mut winner = i;
+            let mut cmp_node = (num_streams + i) / 2;
+            while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX {
+                let challenger = self.loser_tree[cmp_node];
+                let challenger_win =
+                    match (&self.cursors[winner], &self.cursors[challenger]) {
+                        (None, _) => true,
+                        (_, None) => false,
+                        (Some(winner), Some(challenger)) => challenger < winner,
+                    };
+
+                if challenger_win {
+                    self.loser_tree[cmp_node] = winner;
+                    winner = challenger;
+                }
+
+                cmp_node /= 2;
+            }
+            self.loser_tree[cmp_node] = winner;
+        }
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+
+    /// Attempts to updated the loser tree, if possible
+    ///
+    /// Returns None on success, or Some(poll) if the winning input
+    /// was not ready or errored
+    #[inline]
+    fn update_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        if self.loser_tree_adjusted {
+            return TreeUpdate::Complete;
+        }
+
+        let num_streams = self.streams.num_streams();
+        let mut winner = self.loser_tree[0];
+        match self.maybe_poll_stream(cx, winner) {
+            Poll::Ready(Ok(_)) => {}
+            Poll::Ready(Err(e)) => {
+                self.aborted = true;
+                return TreeUpdate::Incomplete(Poll::Ready(Some(Err(e))));
+            }
+            Poll::Pending => return TreeUpdate::Incomplete(Poll::Pending),
+        }
+
+        // Replace overall winner by walking tree of losers
+        let mut cmp_node = (num_streams + winner) / 2;
+        while cmp_node != 0 {
+            let challenger = self.loser_tree[cmp_node];
+            let challenger_win = match (&self.cursors[winner], &self.cursors[challenger])
+            {
+                (None, _) => true,
+                (_, None) => false,
+                (Some(winner), Some(challenger)) => challenger < winner,
+            };
+            if challenger_win {
+                self.loser_tree[cmp_node] = winner;
+                winner = challenger;
+            }
+            cmp_node /= 2;
+        }
+        self.loser_tree[0] = winner;
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+}
+
+/// The result of updating the loser tree. It is the same as an Option
+/// but with specific names for easier readability
+enum TreeUpdate {
+    /// The tree update could not be completed (e.g. the input was not
+    /// ready or had an error). The caller should return the `Poll`
+    /// result to its caller
+    Incomplete(Poll<Option<ArrowResult<RecordBatch>>>),

Review Comment:
   in 1bdb25af1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#discussion_r1067087709


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -660,6 +609,127 @@ impl SortPreservingMergeStream {
             }
         }
     }
+
+    /// Attempts to initialize the loser tree with one value from each
+    /// non exhausted input, if possible.
+    ///
+    /// Returns None on success, or Some(poll) if any of the inputs
+    /// are not ready or errored
+    #[inline]
+    fn init_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        let num_streams = self.streams.num_streams();
+
+        if !self.loser_tree.is_empty() {
+            return TreeUpdate::Complete;
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..num_streams {
+            match self.maybe_poll_stream(cx, i) {
+                Poll::Ready(Ok(_)) => {}
+                Poll::Ready(Err(e)) => {
+                    self.aborted = true;
+                    return TreeUpdate::Error(e);
+                }
+                Poll::Pending => return TreeUpdate::Pending,

Review Comment:
   If this method returned Poll this could just use `ready!` or even `?`



##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -660,6 +609,127 @@ impl SortPreservingMergeStream {
             }
         }
     }
+
+    /// Attempts to initialize the loser tree with one value from each
+    /// non exhausted input, if possible.
+    ///
+    /// Returns None on success, or Some(poll) if any of the inputs
+    /// are not ready or errored
+    #[inline]
+    fn init_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        let num_streams = self.streams.num_streams();
+
+        if !self.loser_tree.is_empty() {
+            return TreeUpdate::Complete;
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..num_streams {
+            match self.maybe_poll_stream(cx, i) {
+                Poll::Ready(Ok(_)) => {}
+                Poll::Ready(Err(e)) => {
+                    self.aborted = true;
+                    return TreeUpdate::Error(e);
+                }
+                Poll::Pending => return TreeUpdate::Pending,
+            }
+        }
+
+        // Init loser tree
+        self.loser_tree.resize(num_streams, usize::MAX);
+        for i in 0..num_streams {
+            let mut winner = i;
+            let mut cmp_node = (num_streams + i) / 2;
+            while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX {
+                let challenger = self.loser_tree[cmp_node];
+                let challenger_win =
+                    match (&self.cursors[winner], &self.cursors[challenger]) {
+                        (None, _) => true,
+                        (_, None) => false,
+                        (Some(winner), Some(challenger)) => challenger < winner,
+                    };
+
+                if challenger_win {
+                    self.loser_tree[cmp_node] = winner;
+                    winner = challenger;
+                }
+
+                cmp_node /= 2;
+            }
+            self.loser_tree[cmp_node] = winner;
+        }
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+
+    /// Attempts to updated the loser tree, if possible
+    ///
+    /// Returns None on success, or Some(poll) if the winning input
+    /// was not ready or errored
+    #[inline]
+    fn update_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> TreeUpdate {
+        if self.loser_tree_adjusted {
+            return TreeUpdate::Complete;
+        }
+
+        let num_streams = self.streams.num_streams();
+        let mut winner = self.loser_tree[0];
+        match self.maybe_poll_stream(cx, winner) {
+            Poll::Ready(Ok(_)) => {}
+            Poll::Ready(Err(e)) => {
+                self.aborted = true;
+                return TreeUpdate::Error(e);
+            }
+            Poll::Pending => return TreeUpdate::Pending,
+        }
+
+        // Replace overall winner by walking tree of losers
+        let mut cmp_node = (num_streams + winner) / 2;
+        while cmp_node != 0 {
+            let challenger = self.loser_tree[cmp_node];
+            let challenger_win = match (&self.cursors[winner], &self.cursors[challenger])
+            {
+                (None, _) => true,
+                (_, None) => false,
+                (Some(winner), Some(challenger)) => challenger < winner,
+            };
+            if challenger_win {
+                self.loser_tree[cmp_node] = winner;
+                winner = challenger;
+            }
+            cmp_node /= 2;
+        }
+        self.loser_tree[0] = winner;
+        self.loser_tree_adjusted = true;
+        TreeUpdate::Complete
+    }
+}
+
+/// The result of updating the loser tree. It is the same as an Option
+/// but with specific names for easier readability
+enum TreeUpdate {

Review Comment:
   I don't really see the need for this over `Poll<Result<()>>` in particular it is confusing what its semantics are w.r.t wakers
   
   ```
   if let Err(e) = self.build()? {
       return Poll::Ready(Err(e))
   }
   ```
   
   Is not significantly more verbose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

Posted by GitBox <gi...@apache.org>.
alamb merged PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4407: Refactor loser tree code in SortPreservingMerge per PR comments

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#issuecomment-1329851832

   Here are performance results. It is somewhat of a mixed bag (some show a few percent less) results are 
   [results.txt](https://github.com/apache/arrow-datafusion/files/10108103/results.txt)
   
   The first run is the second time I ran `cargo bench` against 0d334cf7 (aka no code change) and it reports some regressions and then the second run is with the changes in this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org