You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/22 22:45:54 UTC

[GitHub] [arrow] andygrove opened a new pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

andygrove opened a new pull request #8029:
URL: https://github.com/apache/arrow/pull/8029


   This follows on from https://github.com/apache/arrow/pull/8028.
   
   - Removes `Partition` trait, which was really redundant.
   - `ExecutonPlan.execute()` now takes a partition index.
   - Removed `DatasourceExec` since no longer needed now that `TableProvider.scan` returns an `ExecutionPlan` directly.
   
   This is a step towards extracting the threading model out of the operators like `MergeExec` and having the operators be able to work with different threading models.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#discussion_r475168077



##########
File path: rust/datafusion/src/execution/physical_plan/sort.rs
##########
@@ -61,44 +61,28 @@ impl ExecutionPlan for SortExec {
         self.input.schema().clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![
-            (Arc::new(SortPartition {
-                input: self.input.partitions()?,
-                expr: self.expr.clone(),
-                schema: self.schema(),
-                concurrency: self.concurrency,
-            })),
-        ])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)

Review comment:
       To make sure I am following: this is 1 because we pull everything to the same partition in a sort via `Merge`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#issuecomment-678784408


   > This is an impressive simplification and improvement. Really great work, @andygrove !
   > 
   > I went through it and could not find any issue with it, only benefits.
   > 
   > To make sure I understood this:
   > 
   >     * scans infer the number of partitions based on the number of files they scan
   > 
   >     * the number of partitions is annotated in `Partitioning::UnknownPartitioning(N)`
   > 
   >     * the number of partitions is exposed to others via `output_partitioning`
   > 
   >     * others use `output_partitioning` to know up to which partition number they need to run against to run through all input partitions.
   > 
   > 
   > The invariant is that the maximum number that `execute(...)` accepts equals to the number exposed in `output_partitioning`, i.e. if a plan passes through that number, a `panic!` happens.
   
   Yes, your understanding is correct on all of these points.
   
    
   > I could think of an alternative approach where a plan outputs an iterator over partitions, so that others can iterate against (which would avoid us having to guarantee the invariant mentioned above through discipline), but you probably already though through this alternative :)
   
   Typically, we want to try and execute the partitions in parallel, either on separate threads in a single process or in separate processes in a cluster. We may want to use `async` for the execute method in the future.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #8029:
URL: https://github.com/apache/arrow/pull/8029


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#issuecomment-678728434


   Yes, exactly. We could do more efficient things in the future such as
   perform sorts in parallel and then do a sort-merge join on the results.
   
   On Sat, Aug 22, 2020, 10:19 PM Jorge Leitao <no...@github.com>
   wrote:
   
   > *@jorgecarleitao* commented on this pull request.
   > ------------------------------
   >
   > In rust/datafusion/src/execution/physical_plan/sort.rs
   > <https://github.com/apache/arrow/pull/8029#discussion_r475168077>:
   >
   > > @@ -61,44 +61,28 @@ impl ExecutionPlan for SortExec {
   >          self.input.schema().clone()
   >      }
   >
   > -    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
   > -        Ok(vec![
   > -            (Arc::new(SortPartition {
   > -                input: self.input.partitions()?,
   > -                expr: self.expr.clone(),
   > -                schema: self.schema(),
   > -                concurrency: self.concurrency,
   > -            })),
   > -        ])
   > +    /// Get the output partitioning of this plan
   > +    fn output_partitioning(&self) -> Partitioning {
   > +        Partitioning::UnknownPartitioning(1)
   >
   > To make sure I am following: this is 1 because we pull everything to the
   > same partition in a sort via Merge.
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/8029#pullrequestreview-472977856>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAHEBRGZMBCN76CQNAE6RDTSCCKF3ANCNFSM4QIKKJ7Q>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#issuecomment-678704604


   @jorgecarleitao @alamb Sorry, this is a bit of a larger change than usual.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#discussion_r475225864



##########
File path: rust/datafusion/src/execution/physical_plan/sort.rs
##########
@@ -61,44 +61,28 @@ impl ExecutionPlan for SortExec {
         self.input.schema().clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![
-            (Arc::new(SortPartition {
-                input: self.input.partitions()?,
-                expr: self.expr.clone(),
-                schema: self.schema(),
-                concurrency: self.concurrency,
-            })),
-        ])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)

Review comment:
       Yes, that's correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#issuecomment-678704633


   https://issues.apache.org/jira/browse/ARROW-9464


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#discussion_r475205751



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -350,72 +350,64 @@ impl ExecutionContext {
     }
 
     /// Execute a physical plan and collect the results in memory
-    pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
-        let partitions = plan.partitions()?;
-
-        match partitions.len() {
+    pub fn collect(&self, plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
+        match plan.output_partitioning().partition_count() {
             0 => Ok(vec![]),
             1 => {
-                let it = partitions[0].execute()?;
+                let it = plan.execute(0)?;
                 common::collect(it)
             }
             _ => {
                 // merge into a single partition
                 let plan = MergeExec::new(
                     plan.schema().clone(),
-                    partitions,
+                    plan.clone(),
                     self.state
                         .lock()
                         .expect("failed to lock mutex")
                         .config
                         .concurrency,
                 );
-                let partitions = plan.partitions()?;
-                if partitions.len() == 1 {
-                    common::collect(partitions[0].execute()?)
-                } else {
-                    Err(ExecutionError::InternalError(format!(
-                        "MergeExec returned {} partitions",
-                        partitions.len()
-                    )))
-                }
+                // MergeExec must produce a single partition
+                assert_eq!(1, plan.output_partitioning().partition_count());
+                common::collect(plan.execute(0)?)
             }
         }
     }
 
     /// Execute a query and write the results to a partitioned CSV file
-    pub fn write_csv(&self, plan: &dyn ExecutionPlan, path: &str) -> Result<()> {
+    pub fn write_csv(&self, plan: Arc<dyn ExecutionPlan>, path: &str) -> Result<()> {
         // create directory to contain the CSV files (one per partition)
         let path = path.to_string();
         fs::create_dir(&path)?;
 
-        let threads: Vec<JoinHandle<Result<()>>> = plan
-            .partitions()?
-            .iter()
-            .enumerate()
-            .map(|(i, p)| {
-                let p = p.clone();
-                let path = path.clone();
-                thread::spawn(move || {
-                    let filename = format!("part-{}.csv", i);
-                    let path = Path::new(&path).join(&filename);
-                    let file = fs::File::create(path)?;
-                    let mut writer = csv::Writer::new(file);
-                    let reader = p.execute()?;
-                    let mut reader = reader.lock().unwrap();
-                    loop {
-                        match reader.next_batch() {
-                            Ok(Some(batch)) => {
-                                writer.write(&batch)?;
+        let threads: Vec<JoinHandle<Result<()>>> =

Review comment:
       Not for this PR, but the "write to CSV" might be a good thing to turn into an actual plan node with a logical and execution plan

##########
File path: rust/datafusion/src/execution/physical_plan/sort.rs
##########
@@ -61,44 +61,28 @@ impl ExecutionPlan for SortExec {
         self.input.schema().clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![
-            (Arc::new(SortPartition {
-                input: self.input.partitions()?,
-                expr: self.expr.clone(),
-                schema: self.schema(),
-                concurrency: self.concurrency,
-            })),
-        ])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)

Review comment:
       I think it is because the `Sort` execution plan node produces the output in a big single logical chunk (aka you can't produce parts of the output of the sort in parallel), which is perhaps what you are saying

##########
File path: rust/datafusion/src/datasource/csv.rs
##########
@@ -43,14 +43,15 @@ use std::string::String;
 use std::sync::Arc;
 
 use crate::datasource::TableProvider;
-use crate::error::Result;
+use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::csv::CsvExec;
 pub use crate::execution::physical_plan::csv::CsvReadOptions;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::{common, ExecutionPlan};
 
 /// Represents a CSV file with a provided schema
 pub struct CsvFile {
-    filename: String,
+    /// Path to a single CSV file or a directory containing one of more CSV files

Review comment:
       👍 this had confused me in the past




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org