You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "aprimadi (via GitHub)" <gi...@apache.org> on 2023/05/06 08:09:09 UTC

[GitHub] [arrow-datafusion] aprimadi opened a new pull request, #6260: Add more documentation to SortPreservingMergeStream

aprimadi opened a new pull request, #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260

   # Which issue does this PR close?
   
   N/A
   
   # Rationale for this change
   
   Just adding some documentation to help future readers.
   
   # What changes are included in this PR?
   
   More documentation + inline functions
   
   # Are these changes tested?
   
   No
   
   # Are there any user-facing changes?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] aprimadi commented on a diff in pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "aprimadi (via GitHub)" <gi...@apache.org>.
aprimadi commented on code in PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#discussion_r1188150462


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -506,6 +506,71 @@ mod tests {
         assert_batches_eq!(exp, collected.as_slice());
     }
 
+    #[tokio::test]
+    async fn test_merge_five_partitions() {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let a: ArrayRef = Arc::new(Int32Array::from_slice([1, 2, 3, 7, 9]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from_slice([1, 3, 3, 4, 5]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from_slice([2, 4, 7, 10, 13]));
+        let b3 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from_slice([2, 3, 5, 7, 11]));
+        let b4 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from_slice([3, 5, 9, 13, 17]));
+        let b5 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let schema = b1.schema();
+        let partitions = &[vec![b1], vec![b2], vec![b3], vec![b4], vec![b5]];
+        let sort = vec![PhysicalSortExpr {
+            expr: col("a", &schema).unwrap(),
+            options: Default::default(),
+        }];
+        let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
+
+        #[rustfmt::skip]
+        let expected = &[
+            "+----+",
+            "| a  |",
+            "+----+",
+            "| 1  |",
+            "| 1  |",
+            "| 2  |",
+            "| 2  |",
+            "| 2  |",
+            "| 3  |",
+            "| 3  |",
+            "| 3  |",
+            "| 3  |",
+            "| 3  |",
+            "| 4  |",
+            "| 4  |",
+            "| 5  |",
+            "| 5  |",
+            "| 5  |",
+            "| 7  |",
+            "| 7  |",
+            "| 7  |",
+            "| 9  |",
+            "| 9  |",
+            "| 10 |",
+            "| 11 |",
+            "| 13 |",
+            "| 13 |",
+            "| 17 |",
+            "+----+",
+        ];
+        let collected = collect(merge, task_ctx).await.unwrap();
+        assert_batches_eq!(expected, collected.as_slice());
+    }
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#discussion_r1188412187


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -506,6 +506,71 @@ mod tests {
         assert_batches_eq!(exp, collected.as_slice());
     }
 
+    #[tokio::test]
+    async fn test_merge_five_partitions() {

Review Comment:
   I leave that to your judgement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] aprimadi commented on pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "aprimadi (via GitHub)" <gi...@apache.org>.
aprimadi commented on PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#issuecomment-1537183461

   Perhaps I've overdone this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#discussion_r1187729554


##########
datafusion/core/src/physical_plan/sorts/merge.rs:
##########
@@ -242,22 +260,61 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
         }
     }
 
+    /// Find the leaf node index in the loser tree for the given cursor index
+    ///
+    /// Note that this is not necessarily a leaf node in the tree, but it can
+    /// also be a half-node (a node with only one child). This happens when the
+    /// number of cursors/streams is not a power of two. Thus, the loser tree
+    /// will be unbalanced, but it will still work correctly.
+    ///
+    /// For example, with 5 streams, the loser tree will look like this:
+    ///
+    /// ```text
+    ///           0 (winner)
+    ///
+    ///           1
+    ///        /     \
+    ///       2       3
+    ///     /  \     / \
+    ///    4    |   |   |
+    ///   / \   |   |   |
+    /// -+---+--+---+---+---- Below is not a part of loser tree
+    ///  S3 S4 S0   S1  S2
+    /// ```
+    ///
+    /// S0, S1, ... S4 are the streams (read: stream at index 0, stream at
+    /// index 1, etc.)
+    ///
+    /// Zooming in at node 2 in the loser tree as an example, we can see that
+    /// it takes as input the next item at (S0) and the loser of (S3, S4).
+    ///
+    #[inline]
+    fn lt_leaf_node_index(&self, cursor_index: usize) -> usize {
+        (self.cursors.len() + cursor_index) / 2
+    }
+
+    /// Find the parent node index for the given node index

Review Comment:
   👍  I like refactoring of this functionality as a way to document what is going on



##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -506,6 +506,71 @@ mod tests {
         assert_batches_eq!(exp, collected.as_slice());
     }
 
+    #[tokio::test]
+    async fn test_merge_five_partitions() {

Review Comment:
   I wonder if you can leave some comments about why this test was added / what it is covering to help future readers?
   
   I think the coverage of merging is pretty good with this: https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/merge_fuzz.rs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] aprimadi commented on a diff in pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "aprimadi (via GitHub)" <gi...@apache.org>.
aprimadi commented on code in PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#discussion_r1188097449


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -506,6 +506,71 @@ mod tests {
         assert_batches_eq!(exp, collected.as_slice());
     }
 
+    #[tokio::test]
+    async fn test_merge_five_partitions() {

Review Comment:
   Ah yes, I added that test just to convince myself that the implementation of loser tree works.
   
   Now I'm pretty sure it works, perhaps we can remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#issuecomment-1538796264

   > Perhaps I've overdone this
   
   No I think this is great -- thank you very much. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#issuecomment-1540396680

   Thanks again @aprimadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] aprimadi commented on pull request #6260: Add more documentation to SortPreservingMergeStream

Posted by "aprimadi (via GitHub)" <gi...@apache.org>.
aprimadi commented on PR #6260:
URL: https://github.com/apache/arrow-datafusion/pull/6260#issuecomment-1537161282

   Adding a test just to convince myself that the loser tree implementation work even with unbalanced / incomplete binary tree.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org