You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/20 18:24:09 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #2296: Simplify sort streams

tustvold opened a new pull request, #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296

   # Which issue does this PR close?
   
   Part of #2201
   
    # Rationale for this change
   
   In preparation for making tokio an optional dependency of SortPreservingMerge, this PR shuffles around some of the stream plumbing.
   
   # What changes are included in this PR?
   
   * Changes spawn_execution to use tokio::mpsc for consistency with RecordBatchReceiverStream
   * Removes StreamWrapper indirection
   
   # Are there any user-facing changes?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296#discussion_r854570094


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -354,17 +333,13 @@ impl SortPreservingMergeStream {
             .map(|_| VecDeque::new())
             .collect();
         tracking_metrics.init_mem_used(streams.iter().map(|s| s.mem_used).sum());
-        let wrappers = streams
-            .into_iter()
-            .map(|s| StreamWrapper::Stream(Some(s)))
-            .collect();
+        let wrappers = streams.into_iter().map(|s| s.stream.fuse()).collect();

Review Comment:
   Is `fuse` the magic that avoids the need for the stream wrapper?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296#discussion_r855027332


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -354,17 +333,13 @@ impl SortPreservingMergeStream {
             .map(|_| VecDeque::new())
             .collect();
         tracking_metrics.init_mem_used(streams.iter().map(|s| s.mem_used).sum());
-        let wrappers = streams
-            .into_iter()
-            .map(|s| StreamWrapper::Stream(Some(s)))
-            .collect();
+        let wrappers = streams.into_iter().map(|s| s.stream.fuse()).collect();

Review Comment:
   `RecordBatchReceiverStream` is the thing I was missing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296#discussion_r854609150


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -354,17 +333,13 @@ impl SortPreservingMergeStream {
             .map(|_| VecDeque::new())
             .collect();
         tracking_metrics.init_mem_used(streams.iter().map(|s| s.mem_used).sum());
-        let wrappers = streams
-            .into_iter()
-            .map(|s| StreamWrapper::Stream(Some(s)))
-            .collect();
+        let wrappers = streams.into_iter().map(|s| s.stream.fuse()).collect();

Review Comment:
   It's the combination of this and using RecordBatchReceiverStream to convert the mpsc to a SendableRecordBatchStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen merged pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
yjshen merged PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296#discussion_r854432705


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -269,9 +284,6 @@ pub(crate) struct SortPreservingMergeStream {
     /// The sorted input streams to merge together
     streams: MergingStreams,
 
-    /// Drop helper for tasks feeding the input [`streams`](Self::streams)
-    _drop_helper: AbortOnDropMany<()>,

Review Comment:
   We no longer need this as it is handled by the individual RecordBatchReceiverStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296#discussion_r854432161


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -196,16 +197,23 @@ impl ExecutionPlan for SortPreservingMergeExec {
                             part_i,
                             context.clone(),
                         );
-                        (receiver, join_handle)
+
+                        SortedStream::new(
+                            RecordBatchReceiverStream::create(
+                                &schema,
+                                receiver,
+                                join_handle,
+                            ),
+                            0,

Review Comment:
   I think specifying 0 here is ok, but perhaps @yjshen could confirm?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296#discussion_r854433155


##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -22,8 +22,8 @@ use std::any::Any;
 use std::sync::Arc;
 use std::task::Poll;
 
-use futures::channel::mpsc;
 use futures::Stream;
+use tokio::sync::mpsc;

Review Comment:
   This is updated because of the change to spawn_execution



##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -180,26 +179,23 @@ impl ExecutionPlan for CoalescePartitionsExec {
     }
 }
 
-pin_project! {
-    struct MergeStream {

Review Comment:
   Drive by cleanup, this isn't necessary as the types are unpin



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2296: Simplify sort streams

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2296:
URL: https://github.com/apache/arrow-datafusion/pull/2296#discussion_r854699305


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -196,16 +197,23 @@ impl ExecutionPlan for SortPreservingMergeExec {
                             part_i,
                             context.clone(),
                         );
-                        (receiver, join_handle)
+
+                        SortedStream::new(
+                            RecordBatchReceiverStream::create(
+                                &schema,
+                                receiver,
+                                join_handle,
+                            ),
+                            0,

Review Comment:
   Yes. This field is `mem_used` for `SortedStream`. This 0 is consistent with what we report for Receivers. We currently do not count in memory usage for a single batch for all streams' `next()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org