You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/14 05:11:29 UTC

[arrow] branch master updated: ARROW-9734: [Rust] [DataFusion] TableProvider.scan now returns partitions instead of iterators

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3368159  ARROW-9734: [Rust] [DataFusion] TableProvider.scan now returns partitions instead of iterators
3368159 is described below

commit 3368159269130239ae8b469544dad813c4ade042
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 13 23:10:52 2020 -0600

    ARROW-9734: [Rust] [DataFusion] TableProvider.scan now returns partitions instead of iterators
    
    Closes #7962 from andygrove/datasource-scan-partition
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/src/datasource/csv.rs              | 13 +++----
 rust/datafusion/src/datasource/datasource.rs       | 10 ++----
 rust/datafusion/src/datasource/memory.rs           | 31 ++++++++--------
 rust/datafusion/src/datasource/mod.rs              |  2 +-
 rust/datafusion/src/datasource/parquet.rs          | 39 ++++++++++----------
 rust/datafusion/src/execution/context.rs           | 14 +++++---
 .../src/execution/physical_plan/datasource.rs      | 41 +++-------------------
 7 files changed, 58 insertions(+), 92 deletions(-)

diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs
index 225ebfb..b9fea5e 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -42,11 +42,11 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
 use std::string::String;
 use std::sync::Arc;
 
-use crate::datasource::{ScanResult, TableProvider};
+use crate::datasource::TableProvider;
 use crate::error::Result;
 use crate::execution::physical_plan::csv::CsvExec;
 pub use crate::execution::physical_plan::csv::CsvReadOptions;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{ExecutionPlan, Partition};
 
 /// Represents a CSV file with a provided schema
 pub struct CsvFile {
@@ -84,7 +84,7 @@ impl TableProvider for CsvFile {
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
-    ) -> Result<Vec<ScanResult>> {
+    ) -> Result<Vec<Arc<dyn Partition>>> {
         let exec = CsvExec::try_new(
             &self.filename,
             CsvReadOptions::new()
@@ -95,12 +95,7 @@ impl TableProvider for CsvFile {
             projection.clone(),
             batch_size,
         )?;
-        let partitions = exec.partitions()?;
-        let iterators = partitions
-            .iter()
-            .map(|p| p.execute())
-            .collect::<Result<Vec<_>>>()?;
-        Ok(iterators)
+        exec.partitions()
     }
 }
 
diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs
index 9d83023..1bfedf6 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -17,16 +17,12 @@
 
 //! Data source traits
 
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatchReader;
 
 use crate::error::Result;
-
-/// Returned by implementors of `Table#scan`, this `SendableRecordBatchReader` is wrapped with
-/// an `Arc` and `Mutex` so that it can be shared across threads as it is used.
-pub type ScanResult = Arc<Mutex<dyn RecordBatchReader + Send + Sync>>;
+use crate::execution::physical_plan::Partition;
 
 /// Source table
 pub trait TableProvider {
@@ -39,5 +35,5 @@ pub trait TableProvider {
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
-    ) -> Result<Vec<ScanResult>>;
+    ) -> Result<Vec<Arc<dyn Partition>>>;
 }
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 5f2a5b7..2a98580 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -24,10 +24,10 @@ use std::sync::Arc;
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 
-use crate::datasource::{ScanResult, TableProvider};
+use crate::datasource::TableProvider;
 use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::memory::MemoryExec;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{ExecutionPlan, Partition};
 
 /// In-memory table
 pub struct MemTable {
@@ -60,12 +60,14 @@ impl MemTable {
         let partitions = t.scan(&None, 1024 * 1024)?;
 
         let mut data: Vec<Vec<RecordBatch>> = Vec::with_capacity(partitions.len());
-        for it in &partitions {
-            let mut partition = vec![];
-            while let Ok(Some(batch)) = it.lock().unwrap().next_batch() {
-                partition.push(batch);
+        for partition in &partitions {
+            let it = partition.execute()?;
+            let mut it = it.lock().unwrap();
+            let mut partition_batches = vec![];
+            while let Ok(Some(batch)) = it.next_batch() {
+                partition_batches.push(batch);
             }
-            data.push(partition);
+            data.push(partition_batches);
         }
 
         MemTable::new(schema.clone(), data)
@@ -81,7 +83,7 @@ impl TableProvider for MemTable {
         &self,
         projection: &Option<Vec<usize>>,
         _batch_size: usize,
-    ) -> Result<Vec<ScanResult>> {
+    ) -> Result<Vec<Arc<dyn Partition>>> {
         let columns: Vec<usize> = match projection {
             Some(p) => p.clone(),
             None => {
@@ -114,12 +116,7 @@ impl TableProvider for MemTable {
             projected_schema,
             projection.clone(),
         )?;
-        let partitions = exec.partitions()?;
-        let iterators = partitions
-            .iter()
-            .map(|p| p.execute())
-            .collect::<Result<Vec<_>>>()?;
-        Ok(iterators)
+        exec.partitions()
     }
 }
 
@@ -151,7 +148,8 @@ mod tests {
 
         // scan with projection
         let partitions = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
-        let batch2 = partitions[0].lock().unwrap().next_batch().unwrap().unwrap();
+        let it = partitions[0].execute().unwrap();
+        let batch2 = it.lock().unwrap().next_batch().unwrap().unwrap();
         assert_eq!(2, batch2.schema().fields().len());
         assert_eq!("c", batch2.schema().field(0).name());
         assert_eq!("b", batch2.schema().field(1).name());
@@ -179,7 +177,8 @@ mod tests {
         let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
 
         let partitions = provider.scan(&None, 1024).unwrap();
-        let batch1 = partitions[0].lock().unwrap().next_batch().unwrap().unwrap();
+        let it = partitions[0].execute().unwrap();
+        let batch1 = it.lock().unwrap().next_batch().unwrap().unwrap();
         assert_eq!(3, batch1.schema().fields().len());
         assert_eq!(3, batch1.num_columns());
     }
diff --git a/rust/datafusion/src/datasource/mod.rs b/rust/datafusion/src/datasource/mod.rs
index 7ecc4a1..a3f9966 100644
--- a/rust/datafusion/src/datasource/mod.rs
+++ b/rust/datafusion/src/datasource/mod.rs
@@ -23,5 +23,5 @@ pub mod memory;
 pub mod parquet;
 
 pub use self::csv::{CsvBatchIterator, CsvFile, CsvReadOptions};
-pub use self::datasource::{ScanResult, TableProvider};
+pub use self::datasource::TableProvider;
 pub use self::memory::MemTable;
diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs
index ff32327..9dc480e 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -18,13 +18,14 @@
 //! Parquet data source
 
 use std::string::String;
+use std::sync::Arc;
 
 use arrow::datatypes::*;
 
-use crate::datasource::{ScanResult, TableProvider};
+use crate::datasource::TableProvider;
 use crate::error::Result;
 use crate::execution::physical_plan::parquet::ParquetExec;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{ExecutionPlan, Partition};
 
 /// Table-based representation of a `ParquetFile`.
 pub struct ParquetTable {
@@ -56,17 +57,11 @@ impl TableProvider for ParquetTable {
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
-    ) -> Result<Vec<ScanResult>> {
+    ) -> Result<Vec<Arc<dyn Partition>>> {
         let parquet_exec =
             ParquetExec::try_new(&self.path, projection.clone(), batch_size)?;
 
-        let partitions = parquet_exec.partitions()?;
-
-        let iterators = partitions
-            .iter()
-            .map(|p| p.execute())
-            .collect::<Result<Vec<_>>>()?;
-        Ok(iterators)
+        parquet_exec.partitions()
     }
 }
 
@@ -85,7 +80,8 @@ mod tests {
 
         let projection = None;
         let scan = table.scan(&projection, 2).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
 
         let mut count = 0;
         while let Some(batch) = it.next_batch().unwrap() {
@@ -126,7 +122,8 @@ mod tests {
 
         let projection = None;
         let scan = table.scan(&projection, 1024).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch().unwrap().unwrap();
 
         assert_eq!(11, batch.num_columns());
@@ -139,7 +136,8 @@ mod tests {
 
         let projection = Some(vec![1]);
         let scan = table.scan(&projection, 1024).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch().unwrap().unwrap();
 
         assert_eq!(1, batch.num_columns());
@@ -167,7 +165,8 @@ mod tests {
 
         let projection = Some(vec![0]);
         let scan = table.scan(&projection, 1024).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch().unwrap().unwrap();
 
         assert_eq!(1, batch.num_columns());
@@ -192,7 +191,8 @@ mod tests {
 
         let projection = Some(vec![10]);
         let scan = table.scan(&projection, 1024).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch().unwrap().unwrap();
 
         assert_eq!(1, batch.num_columns());
@@ -217,7 +217,8 @@ mod tests {
 
         let projection = Some(vec![6]);
         let scan = table.scan(&projection, 1024).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch().unwrap().unwrap();
         assert_eq!(1, batch.num_columns());
         assert_eq!(8, batch.num_rows());
@@ -244,7 +245,8 @@ mod tests {
 
         let projection = Some(vec![7]);
         let scan = table.scan(&projection, 1024).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch().unwrap().unwrap();
 
         assert_eq!(1, batch.num_columns());
@@ -272,7 +274,8 @@ mod tests {
 
         let projection = Some(vec![9]);
         let scan = table.scan(&projection, 1024).unwrap();
-        let mut it = scan[0].lock().unwrap();
+        let it = scan[0].execute().unwrap();
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch().unwrap().unwrap();
 
         assert_eq!(1, batch.num_columns());
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 8259104..c6fb38c 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -257,10 +257,16 @@ impl ExecutionContext {
                             "Table provider returned no partitions".to_string(),
                         ))
                     } else {
-                        let partition = partitions[0].lock().unwrap();
-                        let schema = partition.schema();
-                        let exec =
-                            DatasourceExec::new(schema.clone(), partitions.clone());
+                        let schema = match projection {
+                            None => provider.schema().clone(),
+                            Some(p) => Arc::new(Schema::new(
+                                p.iter()
+                                    .map(|i| provider.schema().field(*i).clone())
+                                    .collect(),
+                            )),
+                        };
+
+                        let exec = DatasourceExec::new(schema, partitions.clone());
                         Ok(Arc::new(exec))
                     }
                 }
diff --git a/rust/datafusion/src/execution/physical_plan/datasource.rs b/rust/datafusion/src/execution/physical_plan/datasource.rs
index 73de47e..5a6fe31 100644
--- a/rust/datafusion/src/execution/physical_plan/datasource.rs
+++ b/rust/datafusion/src/execution/physical_plan/datasource.rs
@@ -19,18 +19,17 @@
 
 use std::{
     fmt::{self, Debug, Formatter},
-    sync::{Arc, Mutex},
+    sync::Arc,
 };
 
 use crate::error::Result;
 use crate::execution::physical_plan::{ExecutionPlan, Partition};
 use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatchReader;
 
 /// Datasource execution plan
 pub struct DatasourceExec {
     schema: SchemaRef,
-    partitions: Vec<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>,
+    partitions: Vec<Arc<dyn Partition>>,
 }
 
 impl Debug for DatasourceExec {
@@ -44,10 +43,7 @@ impl Debug for DatasourceExec {
 
 impl DatasourceExec {
     /// Create a new data source execution plan
-    pub fn new(
-        schema: SchemaRef,
-        partitions: Vec<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>,
-    ) -> Self {
+    pub fn new(schema: SchemaRef, partitions: Vec<Arc<dyn Partition>>) -> Self {
         Self { schema, partitions }
     }
 }
@@ -58,35 +54,6 @@ impl ExecutionPlan for DatasourceExec {
     }
 
     fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(self
-            .partitions
-            .iter()
-            .map(|it| {
-                Arc::new(DatasourcePartition::new(it.clone())) as Arc<dyn Partition>
-            })
-            .collect::<Vec<_>>())
-    }
-}
-
-/// Wrapper to convert a `SendableRecordBatchReader` into a `Partition`.
-pub struct DatasourcePartition {
-    batch_iter: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
-}
-
-impl Debug for DatasourcePartition {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        f.debug_struct("DatasourcePartition").finish()
-    }
-}
-
-impl DatasourcePartition {
-    fn new(batch_iter: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>) -> Self {
-        Self { batch_iter }
-    }
-}
-
-impl Partition for DatasourcePartition {
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        Ok(self.batch_iter.clone())
+        Ok(self.partitions.clone())
     }
 }