You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/14 05:11:29 UTC
[arrow] branch master updated: ARROW-9734: [Rust] [DataFusion]
TableProvider.scan now returns partitions instead of iterators
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3368159 ARROW-9734: [Rust] [DataFusion] TableProvider.scan now returns partitions instead of iterators
3368159 is described below
commit 3368159269130239ae8b469544dad813c4ade042
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 13 23:10:52 2020 -0600
ARROW-9734: [Rust] [DataFusion] TableProvider.scan now returns partitions instead of iterators
Closes #7962 from andygrove/datasource-scan-partition
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/src/datasource/csv.rs | 13 +++----
rust/datafusion/src/datasource/datasource.rs | 10 ++----
rust/datafusion/src/datasource/memory.rs | 31 ++++++++--------
rust/datafusion/src/datasource/mod.rs | 2 +-
rust/datafusion/src/datasource/parquet.rs | 39 ++++++++++----------
rust/datafusion/src/execution/context.rs | 14 +++++---
.../src/execution/physical_plan/datasource.rs | 41 +++-------------------
7 files changed, 58 insertions(+), 92 deletions(-)
diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs
index 225ebfb..b9fea5e 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -42,11 +42,11 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
use std::string::String;
use std::sync::Arc;
-use crate::datasource::{ScanResult, TableProvider};
+use crate::datasource::TableProvider;
use crate::error::Result;
use crate::execution::physical_plan::csv::CsvExec;
pub use crate::execution::physical_plan::csv::CsvReadOptions;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{ExecutionPlan, Partition};
/// Represents a CSV file with a provided schema
pub struct CsvFile {
@@ -84,7 +84,7 @@ impl TableProvider for CsvFile {
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
- ) -> Result<Vec<ScanResult>> {
+ ) -> Result<Vec<Arc<dyn Partition>>> {
let exec = CsvExec::try_new(
&self.filename,
CsvReadOptions::new()
@@ -95,12 +95,7 @@ impl TableProvider for CsvFile {
projection.clone(),
batch_size,
)?;
- let partitions = exec.partitions()?;
- let iterators = partitions
- .iter()
- .map(|p| p.execute())
- .collect::<Result<Vec<_>>>()?;
- Ok(iterators)
+ exec.partitions()
}
}
diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs
index 9d83023..1bfedf6 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -17,16 +17,12 @@
//! Data source traits
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatchReader;
use crate::error::Result;
-
-/// Returned by implementors of `Table#scan`, this `SendableRecordBatchReader` is wrapped with
-/// an `Arc` and `Mutex` so that it can be shared across threads as it is used.
-pub type ScanResult = Arc<Mutex<dyn RecordBatchReader + Send + Sync>>;
+use crate::execution::physical_plan::Partition;
/// Source table
pub trait TableProvider {
@@ -39,5 +35,5 @@ pub trait TableProvider {
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
- ) -> Result<Vec<ScanResult>>;
+ ) -> Result<Vec<Arc<dyn Partition>>>;
}
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 5f2a5b7..2a98580 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -24,10 +24,10 @@ use std::sync::Arc;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use crate::datasource::{ScanResult, TableProvider};
+use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::memory::MemoryExec;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{ExecutionPlan, Partition};
/// In-memory table
pub struct MemTable {
@@ -60,12 +60,14 @@ impl MemTable {
let partitions = t.scan(&None, 1024 * 1024)?;
let mut data: Vec<Vec<RecordBatch>> = Vec::with_capacity(partitions.len());
- for it in &partitions {
- let mut partition = vec![];
- while let Ok(Some(batch)) = it.lock().unwrap().next_batch() {
- partition.push(batch);
+ for partition in &partitions {
+ let it = partition.execute()?;
+ let mut it = it.lock().unwrap();
+ let mut partition_batches = vec![];
+ while let Ok(Some(batch)) = it.next_batch() {
+ partition_batches.push(batch);
}
- data.push(partition);
+ data.push(partition_batches);
}
MemTable::new(schema.clone(), data)
@@ -81,7 +83,7 @@ impl TableProvider for MemTable {
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
- ) -> Result<Vec<ScanResult>> {
+ ) -> Result<Vec<Arc<dyn Partition>>> {
let columns: Vec<usize> = match projection {
Some(p) => p.clone(),
None => {
@@ -114,12 +116,7 @@ impl TableProvider for MemTable {
projected_schema,
projection.clone(),
)?;
- let partitions = exec.partitions()?;
- let iterators = partitions
- .iter()
- .map(|p| p.execute())
- .collect::<Result<Vec<_>>>()?;
- Ok(iterators)
+ exec.partitions()
}
}
@@ -151,7 +148,8 @@ mod tests {
// scan with projection
let partitions = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
- let batch2 = partitions[0].lock().unwrap().next_batch().unwrap().unwrap();
+ let it = partitions[0].execute().unwrap();
+ let batch2 = it.lock().unwrap().next_batch().unwrap().unwrap();
assert_eq!(2, batch2.schema().fields().len());
assert_eq!("c", batch2.schema().field(0).name());
assert_eq!("b", batch2.schema().field(1).name());
@@ -179,7 +177,8 @@ mod tests {
let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
let partitions = provider.scan(&None, 1024).unwrap();
- let batch1 = partitions[0].lock().unwrap().next_batch().unwrap().unwrap();
+ let it = partitions[0].execute().unwrap();
+ let batch1 = it.lock().unwrap().next_batch().unwrap().unwrap();
assert_eq!(3, batch1.schema().fields().len());
assert_eq!(3, batch1.num_columns());
}
diff --git a/rust/datafusion/src/datasource/mod.rs b/rust/datafusion/src/datasource/mod.rs
index 7ecc4a1..a3f9966 100644
--- a/rust/datafusion/src/datasource/mod.rs
+++ b/rust/datafusion/src/datasource/mod.rs
@@ -23,5 +23,5 @@ pub mod memory;
pub mod parquet;
pub use self::csv::{CsvBatchIterator, CsvFile, CsvReadOptions};
-pub use self::datasource::{ScanResult, TableProvider};
+pub use self::datasource::TableProvider;
pub use self::memory::MemTable;
diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs
index ff32327..9dc480e 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -18,13 +18,14 @@
//! Parquet data source
use std::string::String;
+use std::sync::Arc;
use arrow::datatypes::*;
-use crate::datasource::{ScanResult, TableProvider};
+use crate::datasource::TableProvider;
use crate::error::Result;
use crate::execution::physical_plan::parquet::ParquetExec;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{ExecutionPlan, Partition};
/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
@@ -56,17 +57,11 @@ impl TableProvider for ParquetTable {
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
- ) -> Result<Vec<ScanResult>> {
+ ) -> Result<Vec<Arc<dyn Partition>>> {
let parquet_exec =
ParquetExec::try_new(&self.path, projection.clone(), batch_size)?;
- let partitions = parquet_exec.partitions()?;
-
- let iterators = partitions
- .iter()
- .map(|p| p.execute())
- .collect::<Result<Vec<_>>>()?;
- Ok(iterators)
+ parquet_exec.partitions()
}
}
@@ -85,7 +80,8 @@ mod tests {
let projection = None;
let scan = table.scan(&projection, 2).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let mut count = 0;
while let Some(batch) = it.next_batch().unwrap() {
@@ -126,7 +122,8 @@ mod tests {
let projection = None;
let scan = table.scan(&projection, 1024).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let batch = it.next_batch().unwrap().unwrap();
assert_eq!(11, batch.num_columns());
@@ -139,7 +136,8 @@ mod tests {
let projection = Some(vec![1]);
let scan = table.scan(&projection, 1024).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let batch = it.next_batch().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
@@ -167,7 +165,8 @@ mod tests {
let projection = Some(vec![0]);
let scan = table.scan(&projection, 1024).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let batch = it.next_batch().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
@@ -192,7 +191,8 @@ mod tests {
let projection = Some(vec![10]);
let scan = table.scan(&projection, 1024).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let batch = it.next_batch().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
@@ -217,7 +217,8 @@ mod tests {
let projection = Some(vec![6]);
let scan = table.scan(&projection, 1024).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let batch = it.next_batch().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());
@@ -244,7 +245,8 @@ mod tests {
let projection = Some(vec![7]);
let scan = table.scan(&projection, 1024).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let batch = it.next_batch().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
@@ -272,7 +274,8 @@ mod tests {
let projection = Some(vec![9]);
let scan = table.scan(&projection, 1024).unwrap();
- let mut it = scan[0].lock().unwrap();
+ let it = scan[0].execute().unwrap();
+ let mut it = it.lock().unwrap();
let batch = it.next_batch().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 8259104..c6fb38c 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -257,10 +257,16 @@ impl ExecutionContext {
"Table provider returned no partitions".to_string(),
))
} else {
- let partition = partitions[0].lock().unwrap();
- let schema = partition.schema();
- let exec =
- DatasourceExec::new(schema.clone(), partitions.clone());
+ let schema = match projection {
+ None => provider.schema().clone(),
+ Some(p) => Arc::new(Schema::new(
+ p.iter()
+ .map(|i| provider.schema().field(*i).clone())
+ .collect(),
+ )),
+ };
+
+ let exec = DatasourceExec::new(schema, partitions.clone());
Ok(Arc::new(exec))
}
}
diff --git a/rust/datafusion/src/execution/physical_plan/datasource.rs b/rust/datafusion/src/execution/physical_plan/datasource.rs
index 73de47e..5a6fe31 100644
--- a/rust/datafusion/src/execution/physical_plan/datasource.rs
+++ b/rust/datafusion/src/execution/physical_plan/datasource.rs
@@ -19,18 +19,17 @@
use std::{
fmt::{self, Debug, Formatter},
- sync::{Arc, Mutex},
+ sync::Arc,
};
use crate::error::Result;
use crate::execution::physical_plan::{ExecutionPlan, Partition};
use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatchReader;
/// Datasource execution plan
pub struct DatasourceExec {
schema: SchemaRef,
- partitions: Vec<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>,
+ partitions: Vec<Arc<dyn Partition>>,
}
impl Debug for DatasourceExec {
@@ -44,10 +43,7 @@ impl Debug for DatasourceExec {
impl DatasourceExec {
/// Create a new data source execution plan
- pub fn new(
- schema: SchemaRef,
- partitions: Vec<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>,
- ) -> Self {
+ pub fn new(schema: SchemaRef, partitions: Vec<Arc<dyn Partition>>) -> Self {
Self { schema, partitions }
}
}
@@ -58,35 +54,6 @@ impl ExecutionPlan for DatasourceExec {
}
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- Ok(self
- .partitions
- .iter()
- .map(|it| {
- Arc::new(DatasourcePartition::new(it.clone())) as Arc<dyn Partition>
- })
- .collect::<Vec<_>>())
- }
-}
-
-/// Wrapper to convert a `SendableRecordBatchReader` into a `Partition`.
-pub struct DatasourcePartition {
- batch_iter: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
-}
-
-impl Debug for DatasourcePartition {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- f.debug_struct("DatasourcePartition").finish()
- }
-}
-
-impl DatasourcePartition {
- fn new(batch_iter: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>) -> Self {
- Self { batch_iter }
- }
-}
-
-impl Partition for DatasourcePartition {
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
- Ok(self.batch_iter.clone())
+ Ok(self.partitions.clone())
}
}