You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/05/19 13:24:11 UTC

[arrow] branch master updated: ARROW-8822: [Rust] [DataFusion] Add InMemoryScan to LogicalPlan

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fc6f53  ARROW-8822: [Rust] [DataFusion] Add InMemoryScan to LogicalPlan
8fc6f53 is described below

commit 8fc6f53e71135c03883e9d32957cae4f59aa6e34
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue May 19 07:23:42 2020 -0600

    ARROW-8822: [Rust] [DataFusion] Add InMemoryScan to LogicalPlan
    
    Add InMemoryScan to LogicalPlan. I forgot to add this one when adding CsvScan and ParquetScan and this one turned out to be a larger change because the memory data source was a bit neglected.
    
    This PR introduces a new Physical Plan for reading partitioned in-memory data sources and updates the MemTable to delegate to it, adding support for partitioning as a side-effect.
    
    Closes #7203 from andygrove/ARROW-8822
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/examples/memory_table_api.rs       |   2 +-
 rust/datafusion/src/datasource/memory.rs           |  87 ++++-------
 rust/datafusion/src/datasource/mod.rs              |   2 +-
 rust/datafusion/src/execution/context.rs           |  13 +-
 .../src/execution/physical_plan/memory.rs          | 162 +++++++++++++++++++++
 rust/datafusion/src/execution/physical_plan/mod.rs |   1 +
 rust/datafusion/src/logicalplan.rs                 |  18 +++
 .../src/optimizer/projection_push_down.rs          |  16 ++
 8 files changed, 243 insertions(+), 58 deletions(-)

diff --git a/rust/datafusion/examples/memory_table_api.rs b/rust/datafusion/examples/memory_table_api.rs
index 2a69fe9..bfa8612 100644
--- a/rust/datafusion/examples/memory_table_api.rs
+++ b/rust/datafusion/examples/memory_table_api.rs
@@ -49,7 +49,7 @@ fn main() -> Result<()> {
     let mut ctx = ExecutionContext::new();
 
     // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
-    let provider = MemTable::new(schema, vec![batch])?;
+    let provider = MemTable::new(schema, vec![vec![batch]])?;
     ctx.register_table("t", Box::new(provider));
     let t = ctx.table("t")?;
 
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 787aa3b..53ca7e9 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -19,29 +19,34 @@
 //! queried by DataFusion. This allows data to be pre-loaded into memory and then
 //! repeatedly queried without incurring additional file I/O overhead.
 
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use arrow::datatypes::{Field, Schema};
 use arrow::record_batch::RecordBatch;
 
 use crate::datasource::{ScanResult, TableProvider};
 use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::BatchIterator;
+use crate::execution::physical_plan::memory::MemoryExec;
+use crate::execution::physical_plan::ExecutionPlan;
 
 /// In-memory table
 pub struct MemTable {
     schema: Arc<Schema>,
-    batches: Vec<RecordBatch>,
+    batches: Vec<Vec<RecordBatch>>,
 }
 
 impl MemTable {
     /// Create a new in-memory table from the provided schema and record batches
-    pub fn new(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<Self> {
-        if batches
-            .iter()
-            .all(|batch| batch.schema().as_ref() == schema.as_ref())
-        {
-            Ok(Self { schema, batches })
+    pub fn new(schema: Arc<Schema>, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
+        if partitions.iter().all(|partition| {
+            partition
+                .iter()
+                .all(|batches| batches.schema().as_ref() == schema.as_ref())
+        }) {
+            Ok(Self {
+                schema,
+                batches: partitions,
+            })
         } else {
             Err(ExecutionError::General(
                 "Mismatch between schema and batches".to_string(),
@@ -54,11 +59,13 @@ impl MemTable {
         let schema = t.schema();
         let partitions = t.scan(&None, 1024 * 1024)?;
 
-        let mut data: Vec<RecordBatch> = vec![];
+        let mut data: Vec<Vec<RecordBatch>> = Vec::with_capacity(partitions.len());
         for it in &partitions {
+            let mut partition = vec![];
             while let Ok(Some(batch)) = it.lock().unwrap().next() {
-                data.push(batch);
+                partition.push(batch);
             }
+            data.push(partition);
         }
 
         MemTable::new(schema.clone(), data)
@@ -102,47 +109,17 @@ impl TableProvider for MemTable {
 
         let projected_schema = Arc::new(Schema::new(projected_columns?));
 
-        let batches = self
-            .batches
+        let exec = MemoryExec::try_new(
+            &self.batches.clone(),
+            projected_schema,
+            projection.clone(),
+        )?;
+        let partitions = exec.partitions()?;
+        let iterators = partitions
             .iter()
-            .map(|batch| {
-                RecordBatch::try_new(
-                    projected_schema.clone(),
-                    columns.iter().map(|i| batch.column(*i).clone()).collect(),
-                )
-            })
-            .collect();
-
-        match batches {
-            Ok(batches) => Ok(vec![Arc::new(Mutex::new(MemBatchIterator {
-                schema: projected_schema.clone(),
-                index: 0,
-                batches,
-            }))]),
-            Err(e) => Err(ExecutionError::ArrowError(e)),
-        }
-    }
-}
-
-/// Iterator over an in-memory table
-pub struct MemBatchIterator {
-    schema: Arc<Schema>,
-    index: usize,
-    batches: Vec<RecordBatch>,
-}
-
-impl BatchIterator for MemBatchIterator {
-    fn schema(&self) -> Arc<Schema> {
-        self.schema.clone()
-    }
-
-    fn next(&mut self) -> Result<Option<RecordBatch>> {
-        if self.index < self.batches.len() {
-            self.index += 1;
-            Ok(Some(self.batches[self.index - 1].clone()))
-        } else {
-            Ok(None)
-        }
+            .map(|p| p.execute())
+            .collect::<Result<Vec<_>>>()?;
+        Ok(iterators)
     }
 }
 
@@ -170,7 +147,7 @@ mod tests {
         )
         .unwrap();
 
-        let provider = MemTable::new(schema, vec![batch]).unwrap();
+        let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
 
         // scan with projection
         let partitions = provider.scan(&Some(vec![2, 1]), 1024).unwrap();
@@ -199,7 +176,7 @@ mod tests {
         )
         .unwrap();
 
-        let provider = MemTable::new(schema, vec![batch]).unwrap();
+        let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
 
         let partitions = provider.scan(&None, 1024).unwrap();
         let batch1 = partitions[0].lock().unwrap().next().unwrap().unwrap();
@@ -225,7 +202,7 @@ mod tests {
         )
         .unwrap();
 
-        let provider = MemTable::new(schema, vec![batch]).unwrap();
+        let provider = MemTable::new(schema, vec![vec![batch]]).unwrap();
 
         let projection: Vec<usize> = vec![0, 4];
 
@@ -261,7 +238,7 @@ mod tests {
         )
         .unwrap();
 
-        match MemTable::new(schema2, vec![batch]) {
+        match MemTable::new(schema2, vec![vec![batch]]) {
             Err(ExecutionError::General(e)) => assert_eq!(
                 "\"Mismatch between schema and batches\"",
                 format!("{:?}", e)
diff --git a/rust/datafusion/src/datasource/mod.rs b/rust/datafusion/src/datasource/mod.rs
index 0adff45..075c81c 100644
--- a/rust/datafusion/src/datasource/mod.rs
+++ b/rust/datafusion/src/datasource/mod.rs
@@ -24,4 +24,4 @@ pub mod parquet;
 
 pub use self::csv::{CsvBatchIterator, CsvFile};
 pub use self::datasource::{ScanResult, TableProvider};
-pub use self::memory::{MemBatchIterator, MemTable};
+pub use self::memory::MemTable;
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index d5684a3..9b8d068 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -41,6 +41,7 @@ use crate::execution::physical_plan::expressions::{
 use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
 use crate::execution::physical_plan::limit::LimitExec;
 use crate::execution::physical_plan::math_expressions::register_math_functions;
+use crate::execution::physical_plan::memory::MemoryExec;
 use crate::execution::physical_plan::merge::MergeExec;
 use crate::execution::physical_plan::parquet::ParquetExec;
 use crate::execution::physical_plan::projection::ProjectionExec;
@@ -299,6 +300,16 @@ impl ExecutionContext {
                     table_name
                 ))),
             },
+            LogicalPlan::InMemoryScan {
+                data,
+                schema,
+                projection,
+                ..
+            } => Ok(Arc::new(MemoryExec::try_new(
+                data,
+                Arc::new(schema.as_ref().to_owned()),
+                projection.to_owned(),
+            )?)),
             LogicalPlan::CsvScan {
                 path,
                 schema,
@@ -897,7 +908,7 @@ mod tests {
 
         let mut ctx = ExecutionContext::new();
 
-        let provider = MemTable::new(Arc::new(schema), vec![batch])?;
+        let provider = MemTable::new(Arc::new(schema), vec![vec![batch]])?;
         ctx.register_table("t", Box::new(provider));
 
         let myfunc: ScalarUdf = |args: &[ArrayRef]| {
diff --git a/rust/datafusion/src/execution/physical_plan/memory.rs b/rust/datafusion/src/execution/physical_plan/memory.rs
new file mode 100644
index 0000000..b045e9f
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/memory.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading in-memory batches of data
+
+use std::sync::{Arc, Mutex};
+
+use crate::error::Result;
+use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+
+/// Execution plan for reading in-memory batches of data
+pub struct MemoryExec {
+    /// The partitions to query
+    partitions: Vec<Vec<RecordBatch>>,
+    /// Schema representing the data after the optional projection is applied
+    schema: Arc<Schema>,
+    /// Optional projection
+    projection: Option<Vec<usize>>,
+}
+
+impl ExecutionPlan for MemoryExec {
+    /// Get the schema for this execution plan
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    /// Get the partitions for this execution plan. Each partition can be executed in parallel.
+    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
+        let partitions = self
+            .partitions
+            .iter()
+            .map(|vec| {
+                Arc::new(MemoryPartition::new(
+                    vec.clone(),
+                    self.schema.clone(),
+                    self.projection.clone(),
+                )) as Arc<dyn Partition>
+            })
+            .collect();
+        Ok(partitions)
+    }
+}
+
+impl MemoryExec {
+    /// Create a new execution plan for reading in-memory record batches
+    pub fn try_new(
+        partitions: &Vec<Vec<RecordBatch>>,
+        schema: Arc<Schema>,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Self> {
+        Ok(Self {
+            partitions: partitions.clone(),
+            schema,
+            projection,
+        })
+    }
+}
+
+/// Memory partition
+struct MemoryPartition {
+    /// Vector of record batches
+    data: Vec<RecordBatch>,
+    /// Schema representing the data
+    schema: Arc<Schema>,
+    /// Optional projection
+    projection: Option<Vec<usize>>,
+}
+
+impl MemoryPartition {
+    /// Create a new in-memory partition
+    fn new(
+        data: Vec<RecordBatch>,
+        schema: Arc<Schema>,
+        projection: Option<Vec<usize>>,
+    ) -> Self {
+        Self {
+            data,
+            schema,
+            projection,
+        }
+    }
+}
+
+impl Partition for MemoryPartition {
+    /// Execute this partition and return an iterator over RecordBatch
+    fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
+        Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
+            self.data.clone(),
+            self.schema.clone(),
+            self.projection.clone(),
+        )?)))
+    }
+}
+
+/// Iterator over batches
+struct MemoryIterator {
+    /// Vector of record batches
+    data: Vec<RecordBatch>,
+    /// Schema representing the data
+    schema: Arc<Schema>,
+    /// Optional projection for which columns to load
+    projection: Option<Vec<usize>>,
+    /// Index into the data
+    index: usize,
+}
+
+impl MemoryIterator {
+    /// Create an iterator for a vector of record batches
+    pub fn try_new(
+        data: Vec<RecordBatch>,
+        schema: Arc<Schema>,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Self> {
+        Ok(Self {
+            data: data.clone(),
+            schema: schema.clone(),
+            projection,
+            index: 0,
+        })
+    }
+}
+
+impl BatchIterator for MemoryIterator {
+    /// Get the schema
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    /// Get the next RecordBatch
+    fn next(&mut self) -> Result<Option<RecordBatch>> {
+        if self.index < self.data.len() {
+            self.index += 1;
+            let batch = &self.data[self.index - 1];
+            // apply projection
+            match &self.projection {
+                Some(columns) => Ok(Some(RecordBatch::try_new(
+                    self.schema.clone(),
+                    columns.iter().map(|i| batch.column(*i).clone()).collect(),
+                )?)),
+                None => Ok(Some(batch.clone())),
+            }
+        } else {
+            Ok(None)
+        }
+    }
+}
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 9868b78..9afe9aa 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -92,6 +92,7 @@ pub mod expressions;
 pub mod hash_aggregate;
 pub mod limit;
 pub mod math_expressions;
+pub mod memory;
 pub mod merge;
 pub mod parquet;
 pub mod projection;
diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs
index 9dcbeba..fddd60d 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -30,6 +30,7 @@ use crate::datasource::TableProvider;
 use crate::error::{ExecutionError, Result};
 use crate::optimizer::utils;
 use crate::sql::parser::FileType;
+use arrow::record_batch::RecordBatch;
 
 /// Enumeration of supported function types (Scalar and Aggregate)
 #[derive(Debug, Clone)]
@@ -529,6 +530,17 @@ pub enum LogicalPlan {
         /// The projected schema
         projected_schema: Box<Schema>,
     },
+    /// A table scan against a vector of record batches
+    InMemoryScan {
+        /// Record batch partitions
+        data: Vec<Vec<RecordBatch>>,
+        /// The schema of the record batches
+        schema: Box<Schema>,
+        /// Optional column indices to use as a projection
+        projection: Option<Vec<usize>>,
+        /// The projected schema
+        projected_schema: Box<Schema>,
+    },
     /// A table scan against a Parquet data source
     ParquetScan {
         /// The path to the files
@@ -587,6 +599,9 @@ impl LogicalPlan {
     pub fn schema(&self) -> &Box<Schema> {
         match self {
             LogicalPlan::EmptyRelation { schema } => &schema,
+            LogicalPlan::InMemoryScan {
+                projected_schema, ..
+            } => &projected_schema,
             LogicalPlan::CsvScan {
                 projected_schema, ..
             } => &projected_schema,
@@ -621,6 +636,9 @@ impl LogicalPlan {
                 ref projection,
                 ..
             } => write!(f, "TableScan: {} projection={:?}", table_name, projection),
+            LogicalPlan::InMemoryScan { ref projection, .. } => {
+                write!(f, "InMemoryScan: projection={:?}", projection)
+            }
             LogicalPlan::CsvScan {
                 ref path,
                 ref projection,
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index fab371a..8f614fc 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -114,6 +114,22 @@ impl ProjectionPushDown {
                     projection: Some(projection),
                 })
             }
+            LogicalPlan::InMemoryScan {
+                data,
+                schema,
+                projection,
+                ..
+            } => {
+                let (projection, projected_schema) =
+                    get_projected_schema(&schema, projection, accum, mapping)?;
+
+                Ok(LogicalPlan::InMemoryScan {
+                    data: data.clone(),
+                    schema: schema.clone(),
+                    projection: Some(projection),
+                    projected_schema: Box::new(projected_schema),
+                })
+            }
             LogicalPlan::CsvScan {
                 path,
                 has_header,