You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/13 18:55:39 UTC

[GitHub] [arrow] alamb commented on a change in pull request #7958: ARROW-9725: [Rust] [DataFusion] SortExec and LimitExec re-use MergeExec

alamb commented on a change in pull request #7958:
URL: https://github.com/apache/arrow/pull/7958#discussion_r470174139



##########
File path: rust/datafusion/src/execution/physical_plan/limit.rs
##########
@@ -81,39 +82,40 @@ struct LimitPartition {
 
 impl Partition for LimitPartition {
     fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        // collect up to "limit" rows on each partition
-        let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
+        // apply limit in parallel across all input partitions
+        let local_limit = self
             .partitions
             .iter()
             .map(|p| {
-                let p = p.clone();
-                let limit = self.limit;
-                thread::spawn(move || {
-                    let it = p.execute()?;
-                    collect_with_limit(it, limit)
-                })
+                Arc::new(LocalLimitExec::new(
+                    p.clone(),
+                    self.schema.clone(),
+                    self.limit,
+                )) as Arc<dyn Partition>
             })
             .collect();
 
-        // combine the results from each thread, up to the limit
+        // limit needs to collapse inputs down to a single partition
+        let merge = MergeExec::new(self.schema.clone(), local_limit);
+        // MergeExec has a single partition
+        let it = merge.partitions()?[0].execute()?;

Review comment:
       I wonder if we should actually assert / check here that MergeExec does indeed produce only a single partition? Perhaps by adding an `assert_eq!(partitions.len(), 1)` type check; I am thinking of the case in the future if/when someone changes `MergeExec` to work differently and produce multiple partitions or something and then spends time tracking down some tricky wrong results bug

##########
File path: rust/datafusion/src/execution/physical_plan/limit.rs
##########
@@ -81,39 +82,40 @@ struct LimitPartition {
 
 impl Partition for LimitPartition {
     fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        // collect up to "limit" rows on each partition
-        let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
+        // apply limit in parallel across all input partitions
+        let local_limit = self
             .partitions
             .iter()
             .map(|p| {
-                let p = p.clone();
-                let limit = self.limit;
-                thread::spawn(move || {
-                    let it = p.execute()?;
-                    collect_with_limit(it, limit)
-                })
+                Arc::new(LocalLimitExec::new(

Review comment:
       This "pick the top N from each partition and then pick the top N from the merged result" is an example of operator pushdown -- it is cool, and I wonder if there would be some way to represent this in the planner (rather than execution time).
   
   I am thinking, for example, of pushing the limit down past `Projection`s for example.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org