You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/23 11:22:44 UTC

[GitHub] [arrow] alamb commented on a change in pull request #8029: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

alamb commented on a change in pull request #8029:
URL: https://github.com/apache/arrow/pull/8029#discussion_r475205751



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -350,72 +350,64 @@ impl ExecutionContext {
     }
 
     /// Execute a physical plan and collect the results in memory
-    pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
-        let partitions = plan.partitions()?;
-
-        match partitions.len() {
+    pub fn collect(&self, plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
+        match plan.output_partitioning().partition_count() {
             0 => Ok(vec![]),
             1 => {
-                let it = partitions[0].execute()?;
+                let it = plan.execute(0)?;
                 common::collect(it)
             }
             _ => {
                 // merge into a single partition
                 let plan = MergeExec::new(
                     plan.schema().clone(),
-                    partitions,
+                    plan.clone(),
                     self.state
                         .lock()
                         .expect("failed to lock mutex")
                         .config
                         .concurrency,
                 );
-                let partitions = plan.partitions()?;
-                if partitions.len() == 1 {
-                    common::collect(partitions[0].execute()?)
-                } else {
-                    Err(ExecutionError::InternalError(format!(
-                        "MergeExec returned {} partitions",
-                        partitions.len()
-                    )))
-                }
+                // MergeExec must produce a single partition
+                assert_eq!(1, plan.output_partitioning().partition_count());
+                common::collect(plan.execute(0)?)
             }
         }
     }
 
     /// Execute a query and write the results to a partitioned CSV file
-    pub fn write_csv(&self, plan: &dyn ExecutionPlan, path: &str) -> Result<()> {
+    pub fn write_csv(&self, plan: Arc<dyn ExecutionPlan>, path: &str) -> Result<()> {
         // create directory to contain the CSV files (one per partition)
         let path = path.to_string();
         fs::create_dir(&path)?;
 
-        let threads: Vec<JoinHandle<Result<()>>> = plan
-            .partitions()?
-            .iter()
-            .enumerate()
-            .map(|(i, p)| {
-                let p = p.clone();
-                let path = path.clone();
-                thread::spawn(move || {
-                    let filename = format!("part-{}.csv", i);
-                    let path = Path::new(&path).join(&filename);
-                    let file = fs::File::create(path)?;
-                    let mut writer = csv::Writer::new(file);
-                    let reader = p.execute()?;
-                    let mut reader = reader.lock().unwrap();
-                    loop {
-                        match reader.next_batch() {
-                            Ok(Some(batch)) => {
-                                writer.write(&batch)?;
+        let threads: Vec<JoinHandle<Result<()>>> =

Review comment:
       Not for this PR, but the "write to CSV" might be a good thing to turn into an actual plan node with a logical and execution plan

##########
File path: rust/datafusion/src/execution/physical_plan/sort.rs
##########
@@ -61,44 +61,28 @@ impl ExecutionPlan for SortExec {
         self.input.schema().clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![
-            (Arc::new(SortPartition {
-                input: self.input.partitions()?,
-                expr: self.expr.clone(),
-                schema: self.schema(),
-                concurrency: self.concurrency,
-            })),
-        ])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)

Review comment:
       I think it is because the `Sort` execution plan node produces the output in a big single logical chunk (aka you can't produce parts of the output of the sort in parallel), which is perhaps what you are saying

##########
File path: rust/datafusion/src/datasource/csv.rs
##########
@@ -43,14 +43,15 @@ use std::string::String;
 use std::sync::Arc;
 
 use crate::datasource::TableProvider;
-use crate::error::Result;
+use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::csv::CsvExec;
 pub use crate::execution::physical_plan::csv::CsvReadOptions;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::{common, ExecutionPlan};
 
 /// Represents a CSV file with a provided schema
 pub struct CsvFile {
-    filename: String,
+    /// Path to a single CSV file or a directory containing one of more CSV files

Review comment:
       👍 this had confused me in the past




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org