You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/05/19 13:24:11 UTC
[arrow] branch master updated: ARROW-8822: [Rust] [DataFusion] Add
InMemoryScan to LogicalPlan
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8fc6f53 ARROW-8822: [Rust] [DataFusion] Add InMemoryScan to LogicalPlan
8fc6f53 is described below
commit 8fc6f53e71135c03883e9d32957cae4f59aa6e34
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue May 19 07:23:42 2020 -0600
ARROW-8822: [Rust] [DataFusion] Add InMemoryScan to LogicalPlan
Add InMemoryScan to LogicalPlan. I forgot to add this one when adding CsvScan and ParquetScan and this one turned out to be a larger change because the memory data source was a bit neglected.
This PR introduces a new Physical Plan for reading partitioned in-memory data sources and updates the MemTable to delegate to it, adding support for partitioning as a side-effect.
Closes #7203 from andygrove/ARROW-8822
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/examples/memory_table_api.rs | 2 +-
rust/datafusion/src/datasource/memory.rs | 87 ++++-------
rust/datafusion/src/datasource/mod.rs | 2 +-
rust/datafusion/src/execution/context.rs | 13 +-
.../src/execution/physical_plan/memory.rs | 162 +++++++++++++++++++++
rust/datafusion/src/execution/physical_plan/mod.rs | 1 +
rust/datafusion/src/logicalplan.rs | 18 +++
.../src/optimizer/projection_push_down.rs | 16 ++
8 files changed, 243 insertions(+), 58 deletions(-)
diff --git a/rust/datafusion/examples/memory_table_api.rs b/rust/datafusion/examples/memory_table_api.rs
index 2a69fe9..bfa8612 100644
--- a/rust/datafusion/examples/memory_table_api.rs
+++ b/rust/datafusion/examples/memory_table_api.rs
@@ -49,7 +49,7 @@ fn main() -> Result<()> {
let mut ctx = ExecutionContext::new();
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
- let provider = MemTable::new(schema, vec![batch])?;
+ let provider = MemTable::new(schema, vec![vec![batch]])?;
ctx.register_table("t", Box::new(provider));
let t = ctx.table("t")?;
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 787aa3b..53ca7e9 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -19,29 +19,34 @@
//! queried by DataFusion. This allows data to be pre-loaded into memory and then
//! repeatedly queried without incurring additional file I/O overhead.
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use crate::datasource::{ScanResult, TableProvider};
use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::BatchIterator;
+use crate::execution::physical_plan::memory::MemoryExec;
+use crate::execution::physical_plan::ExecutionPlan;
/// In-memory table
pub struct MemTable {
schema: Arc<Schema>,
- batches: Vec<RecordBatch>,
+ batches: Vec<Vec<RecordBatch>>,
}
impl MemTable {
/// Create a new in-memory table from the provided schema and record batches
- pub fn new(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<Self> {
- if batches
- .iter()
- .all(|batch| batch.schema().as_ref() == schema.as_ref())
- {
- Ok(Self { schema, batches })
+ pub fn new(schema: Arc<Schema>, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
+ if partitions.iter().all(|partition| {
+ partition
+ .iter()
+ .all(|batches| batches.schema().as_ref() == schema.as_ref())
+ }) {
+ Ok(Self {
+ schema,
+ batches: partitions,
+ })
} else {
Err(ExecutionError::General(
"Mismatch between schema and batches".to_string(),
@@ -54,11 +59,13 @@ impl MemTable {
let schema = t.schema();
let partitions = t.scan(&None, 1024 * 1024)?;
- let mut data: Vec<RecordBatch> = vec![];
+ let mut data: Vec<Vec<RecordBatch>> = Vec::with_capacity(partitions.len());
for it in &partitions {
+ let mut partition = vec![];
while let Ok(Some(batch)) = it.lock().unwrap().next() {
- data.push(batch);
+ partition.push(batch);
}
+ data.push(partition);
}
MemTable::new(schema.clone(), data)
@@ -102,47 +109,17 @@ impl TableProvider for MemTable {
let projected_schema = Arc::new(Schema::new(projected_columns?));
- let batches = self
- .batches
+ let exec = MemoryExec::try_new(
+ &self.batches.clone(),
+ projected_schema,
+ projection.clone(),
+ )?;
+ let partitions = exec.partitions()?;
+ let iterators = partitions
.iter()
- .map(|batch| {
- RecordBatch::try_new(
- projected_schema.clone(),
- columns.iter().map(|i| batch.column(*i).clone()).collect(),
- )
- })
- .collect();
-
- match batches {
- Ok(batches) => Ok(vec![Arc::new(Mutex::new(MemBatchIterator {
- schema: projected_schema.clone(),
- index: 0,
- batches,
- }))]),
- Err(e) => Err(ExecutionError::ArrowError(e)),
- }
- }
-}
-
-/// Iterator over an in-memory table
-pub struct MemBatchIterator {
- schema: Arc<Schema>,
- index: usize,
- batches: Vec<RecordBatch>,
-}
-
-impl BatchIterator for MemBatchIterator {
- fn schema(&self) -> Arc<Schema> {
- self.schema.clone()
- }
-
- fn next(&mut self) -> Result<Option<RecordBatch>> {
- if self.index < self.batches.len() {
- self.index += 1;
- Ok(Some(self.batches[self.index - 1].clone()))
- } else {
- Ok(None)
- }
+ .map(|p| p.execute())
+ .collect::<Result<Vec<_>>>()?;
+ Ok(iterators)
}
}
@@ -170,7 +147,7 @@ mod tests {
)
.unwrap();
- let provider = MemTable::new(schema, vec![batch]).unwrap();
+ let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
// scan with projection
let partitions = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
@@ -199,7 +176,7 @@ mod tests {
)
.unwrap();
- let provider = MemTable::new(schema, vec![batch]).unwrap();
+ let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
let partitions = provider.scan(&None, 1024).unwrap();
let batch1 = partitions[0].lock().unwrap().next().unwrap().unwrap();
@@ -225,7 +202,7 @@ mod tests {
)
.unwrap();
- let provider = MemTable::new(schema, vec![batch]).unwrap();
+ let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
let projection: Vec<usize> = vec![0, 4];
@@ -261,7 +238,7 @@ mod tests {
)
.unwrap();
- match MemTable::new(schema2, vec![batch]) {
+ match MemTable::new(schema2, vec![vec![batch]]) {
Err(ExecutionError::General(e)) => assert_eq!(
"\"Mismatch between schema and batches\"",
format!("{:?}", e)
diff --git a/rust/datafusion/src/datasource/mod.rs b/rust/datafusion/src/datasource/mod.rs
index 0adff45..075c81c 100644
--- a/rust/datafusion/src/datasource/mod.rs
+++ b/rust/datafusion/src/datasource/mod.rs
@@ -24,4 +24,4 @@ pub mod parquet;
pub use self::csv::{CsvBatchIterator, CsvFile};
pub use self::datasource::{ScanResult, TableProvider};
-pub use self::memory::{MemBatchIterator, MemTable};
+pub use self::memory::MemTable;
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index d5684a3..9b8d068 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -41,6 +41,7 @@ use crate::execution::physical_plan::expressions::{
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
use crate::execution::physical_plan::limit::LimitExec;
use crate::execution::physical_plan::math_expressions::register_math_functions;
+use crate::execution::physical_plan::memory::MemoryExec;
use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::parquet::ParquetExec;
use crate::execution::physical_plan::projection::ProjectionExec;
@@ -299,6 +300,16 @@ impl ExecutionContext {
table_name
))),
},
+ LogicalPlan::InMemoryScan {
+ data,
+ schema,
+ projection,
+ ..
+ } => Ok(Arc::new(MemoryExec::try_new(
+ data,
+ Arc::new(schema.as_ref().to_owned()),
+ projection.to_owned(),
+ )?)),
LogicalPlan::CsvScan {
path,
schema,
@@ -897,7 +908,7 @@ mod tests {
let mut ctx = ExecutionContext::new();
- let provider = MemTable::new(Arc::new(schema), vec![batch])?;
+ let provider = MemTable::new(Arc::new(schema), vec![vec![batch]])?;
ctx.register_table("t", Box::new(provider));
let myfunc: ScalarUdf = |args: &[ArrayRef]| {
diff --git a/rust/datafusion/src/execution/physical_plan/memory.rs b/rust/datafusion/src/execution/physical_plan/memory.rs
new file mode 100644
index 0000000..b045e9f
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/memory.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading in-memory batches of data
+
+use std::sync::{Arc, Mutex};
+
+use crate::error::Result;
+use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+
+/// Execution plan for reading in-memory batches of data
+pub struct MemoryExec {
+ /// The partitions to query
+ partitions: Vec<Vec<RecordBatch>>,
+ /// Schema representing the data after the optional projection is applied
+ schema: Arc<Schema>,
+ /// Optional projection
+ projection: Option<Vec<usize>>,
+}
+
+impl ExecutionPlan for MemoryExec {
+ /// Get the schema for this execution plan
+ fn schema(&self) -> Arc<Schema> {
+ self.schema.clone()
+ }
+
+ /// Get the partitions for this execution plan. Each partition can be executed in parallel.
+ fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
+ let partitions = self
+ .partitions
+ .iter()
+ .map(|vec| {
+ Arc::new(MemoryPartition::new(
+ vec.clone(),
+ self.schema.clone(),
+ self.projection.clone(),
+ )) as Arc<dyn Partition>
+ })
+ .collect();
+ Ok(partitions)
+ }
+}
+
+impl MemoryExec {
+ /// Create a new execution plan for reading in-memory record batches
+ pub fn try_new(
+ partitions: &Vec<Vec<RecordBatch>>,
+ schema: Arc<Schema>,
+ projection: Option<Vec<usize>>,
+ ) -> Result<Self> {
+ Ok(Self {
+ partitions: partitions.clone(),
+ schema,
+ projection,
+ })
+ }
+}
+
+/// Memory partition
+struct MemoryPartition {
+ /// Vector of record batches
+ data: Vec<RecordBatch>,
+ /// Schema representing the data
+ schema: Arc<Schema>,
+ /// Optional projection
+ projection: Option<Vec<usize>>,
+}
+
+impl MemoryPartition {
+ /// Create a new in-memory partition
+ fn new(
+ data: Vec<RecordBatch>,
+ schema: Arc<Schema>,
+ projection: Option<Vec<usize>>,
+ ) -> Self {
+ Self {
+ data,
+ schema,
+ projection,
+ }
+ }
+}
+
+impl Partition for MemoryPartition {
+ /// Execute this partition and return an iterator over RecordBatch
+ fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
+ Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
+ self.data.clone(),
+ self.schema.clone(),
+ self.projection.clone(),
+ )?)))
+ }
+}
+
+/// Iterator over batches
+struct MemoryIterator {
+ /// Vector of record batches
+ data: Vec<RecordBatch>,
+ /// Schema representing the data
+ schema: Arc<Schema>,
+ /// Optional projection for which columns to load
+ projection: Option<Vec<usize>>,
+ /// Index into the data
+ index: usize,
+}
+
+impl MemoryIterator {
+ /// Create an iterator for a vector of record batches
+ pub fn try_new(
+ data: Vec<RecordBatch>,
+ schema: Arc<Schema>,
+ projection: Option<Vec<usize>>,
+ ) -> Result<Self> {
+ Ok(Self {
+ data: data.clone(),
+ schema: schema.clone(),
+ projection,
+ index: 0,
+ })
+ }
+}
+
+impl BatchIterator for MemoryIterator {
+ /// Get the schema
+ fn schema(&self) -> Arc<Schema> {
+ self.schema.clone()
+ }
+
+ /// Get the next RecordBatch
+ fn next(&mut self) -> Result<Option<RecordBatch>> {
+ if self.index < self.data.len() {
+ self.index += 1;
+ let batch = &self.data[self.index - 1];
+ // apply projection
+ match &self.projection {
+ Some(columns) => Ok(Some(RecordBatch::try_new(
+ self.schema.clone(),
+ columns.iter().map(|i| batch.column(*i).clone()).collect(),
+ )?)),
+ None => Ok(Some(batch.clone())),
+ }
+ } else {
+ Ok(None)
+ }
+ }
+}
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 9868b78..9afe9aa 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -92,6 +92,7 @@ pub mod expressions;
pub mod hash_aggregate;
pub mod limit;
pub mod math_expressions;
+pub mod memory;
pub mod merge;
pub mod parquet;
pub mod projection;
diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs
index 9dcbeba..fddd60d 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -30,6 +30,7 @@ use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::optimizer::utils;
use crate::sql::parser::FileType;
+use arrow::record_batch::RecordBatch;
/// Enumeration of supported function types (Scalar and Aggregate)
#[derive(Debug, Clone)]
@@ -529,6 +530,17 @@ pub enum LogicalPlan {
/// The projected schema
projected_schema: Box<Schema>,
},
+ /// A table scan against a vector of record batches
+ InMemoryScan {
+ /// Record batch partitions
+ data: Vec<Vec<RecordBatch>>,
+ /// The schema of the record batches
+ schema: Box<Schema>,
+ /// Optional column indices to use as a projection
+ projection: Option<Vec<usize>>,
+ /// The projected schema
+ projected_schema: Box<Schema>,
+ },
/// A table scan against a Parquet data source
ParquetScan {
/// The path to the files
@@ -587,6 +599,9 @@ impl LogicalPlan {
pub fn schema(&self) -> &Box<Schema> {
match self {
LogicalPlan::EmptyRelation { schema } => &schema,
+ LogicalPlan::InMemoryScan {
+ projected_schema, ..
+ } => &projected_schema,
LogicalPlan::CsvScan {
projected_schema, ..
} => &projected_schema,
@@ -621,6 +636,9 @@ impl LogicalPlan {
ref projection,
..
} => write!(f, "TableScan: {} projection={:?}", table_name, projection),
+ LogicalPlan::InMemoryScan { ref projection, .. } => {
+ write!(f, "InMemoryScan: projection={:?}", projection)
+ }
LogicalPlan::CsvScan {
ref path,
ref projection,
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index fab371a..8f614fc 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -114,6 +114,22 @@ impl ProjectionPushDown {
projection: Some(projection),
})
}
+ LogicalPlan::InMemoryScan {
+ data,
+ schema,
+ projection,
+ ..
+ } => {
+ let (projection, projected_schema) =
+ get_projected_schema(&schema, projection, accum, mapping)?;
+
+ Ok(LogicalPlan::InMemoryScan {
+ data: data.clone(),
+ schema: schema.clone(),
+ projection: Some(projection),
+ projected_schema: Box::new(projected_schema),
+ })
+ }
LogicalPlan::CsvScan {
path,
has_header,