You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/21 17:48:59 UTC

[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1624: Optimize `SortPreservingMergeStream` to avoid `SortKeyCursor` sharing

tustvold commented on a change in pull request #1624:
URL: https://github.com/apache/arrow-datafusion/pull/1624#discussion_r789872025



##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -519,10 +525,10 @@ impl SortPreservingMergeStream {
         // any RowIndex's reliant on the cursor indexes
         //
         // We can therefore drop all but the last cursor for each stream
-        for cursors in &mut self.cursors {
-            if cursors.len() > 1 {
+        for batches in &mut self.batches {

Review comment:
       Comment needs updating

##########
File path: datafusion/src/physical_plan/sorts/mod.rs
##########
@@ -103,17 +102,14 @@ impl SortKeyCursor {
     }
 
     fn is_finished(&self) -> bool {
-        self.num_rows == self.cur_row()
+        self.num_rows == self.cur_row
     }
 
-    fn advance(&self) -> usize {
+    fn advance(&mut self) -> usize {
         assert!(!self.is_finished());
-        self.cur_row
-            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)

Review comment:
       :heart: 

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -571,9 +577,9 @@ impl SortPreservingMergeStream {
             let _timer = elapsed_compute.timer();
 
             match self.min_heap.pop() {
-                Some(cursor) => {
+                Some(mut cursor) => {
                     let stream_idx = cursor.stream_idx;
-                    let cursor_idx = self.cursors[stream_idx].len() - 1;
+                    let cursor_idx = self.batches[stream_idx].len() - 1;

Review comment:
       I think this and the corresponding field in `RowIndex` should be updated, perhaps to batch_idx

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -414,25 +424,23 @@ impl SortPreservingMergeStream {
                 return Poll::Ready(Err(e));
             }
             Some(Ok(batch)) => {
-                let cursor = Arc::new(
-                    match SortKeyCursor::new(
-                        idx,
-                        self.next_batch_index, // assign this batch an ID
-                        Arc::new(batch),
-                        &self.column_expressions,
-                        self.sort_options.clone(),
-                    ) {
-                        Ok(cursor) => cursor,
-                        Err(e) => {
-                            return Poll::Ready(Err(ArrowError::ExternalError(
-                                Box::new(e),
-                            )));
-                        }
-                    },
-                );
+                let batch = Arc::new(batch);
+                let cursor = match SortKeyCursor::new(
+                    idx,
+                    self.next_batch_index, // assign this batch an ID

Review comment:
       I think now that we have an array called `batches` it will be less confusing for this concept to renamed to `batch_id
   `, otherwise it might be confused with an index into `batches` as opposed to an id used to cache comparators.

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -280,7 +287,11 @@ pub(crate) struct SortPreservingMergeStream {
     ///
     /// Exhausted cursors will be popped off the front once all
     /// their rows have been yielded to the output
-    cursors: Vec<VecDeque<Arc<SortKeyCursor>>>,
+    batches: Vec<VecDeque<Arc<RecordBatch>>>,

Review comment:
       I wonder if the `Arc<RecordBatch>` could be removed from `SortKeyCursor` and therefore this just be `Vec<VecDequeue<RecordBatch>>`. `SortKeyCursor::batch` only appears to now be used for debug formatting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org