You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2019/08/06 16:58:08 UTC

[arrow] branch master updated: ARROW-6088: [Rust] [DataFusion] Projection execution plan

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d9b0ef1  ARROW-6088: [Rust] [DataFusion] Projection execution plan
d9b0ef1 is described below

commit d9b0ef18149ba7cbee62240adb496b666d73db6a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Aug 6 10:57:54 2019 -0600

    ARROW-6088: [Rust] [DataFusion] Projection execution plan
    
    This PR implements the projection and CSV execution plans (I can split this into two PRs if necessary - one for CSV then one for projection).
    
    Note that while I implement execution plans for each relational operator (projection, selection, aggregate, etc) there will be duplicate implementations because we already have the existing execution code that directly executes the logical plan. Once the new physical plan is in place, I will remove the original execution logic (and translate the logical plan to a physical plan).
    
    Closes #4988 from andygrove/ARROW-6088 and squashes the following commits:
    
    755365c85 <Andy Grove> Rebase and remove unwrap
    fec84aff2 <Andy Grove> test only delete temp path if exist
    8f11c8194 <Andy Grove> save
    6db609fc4 <Andy Grove> test passes
    717dcd87b <Andy Grove> implement mutex for iterator
    abf6d5eb6 <Andy Grove> Save
    a26575e6b <Andy Grove> rough out CSV execution plan
    e806c76fc <Andy Grove> formatting
    768a7ae21 <Andy Grove> Implement Column expression
    d1ede3cae <Andy Grove> Implement projection logix
    18759028d <Andy Grove> Roughing out projection execution plan
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/src/execution/physical_plan/csv.rs | 197 +++++++++++++++++
 .../src/execution/physical_plan/expressions.rs     |  53 +++++
 .../{physical_plan.rs => physical_plan/mod.rs}     |  25 ++-
 .../src/execution/physical_plan/projection.rs      | 238 +++++++++++++++++++++
 4 files changed, 508 insertions(+), 5 deletions(-)

diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs
new file mode 100644
index 0000000..2c7ebc2
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/csv.rs
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading CSV files
+
+use std::fs;
+use std::fs::File;
+use std::sync::{Arc, Mutex};
+
+use crate::error::{ExecutionError, Result};
+use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
+use arrow::csv;
+use arrow::datatypes::{Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+/// Execution plan for scanning a CSV file
+pub struct CsvExec {
+    /// Path to directory containing partitioned CSV files with the same schema
+    path: String,
+    /// Schema representing the CSV files after the optional projection is applied
+    schema: Arc<Schema>,
+    /// Does the CSV file have a header?
+    has_header: bool,
+    /// Optional projection for which columns to load
+    projection: Option<Vec<usize>>,
+    /// Batch size
+    batch_size: usize,
+}
+
+impl ExecutionPlan for CsvExec {
+    /// Get the schema for this execution plan
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    /// Get the partitions for this execution plan. Each partition can be executed in parallel.
+    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
+        let mut filenames: Vec<String> = vec![];
+        self.build_file_list(&self.path, &mut filenames)?;
+        let partitions = filenames
+            .iter()
+            .map(|filename| {
+                Arc::new(CsvPartition::new(
+                    &filename,
+                    self.schema.clone(),
+                    self.has_header,
+                    self.projection.clone(),
+                    self.batch_size,
+                )) as Arc<dyn Partition>
+            })
+            .collect();
+        Ok(partitions)
+    }
+}
+
+impl CsvExec {
+    /// Create a new execution plan for reading a set of CSV files
+    pub fn try_new(
+        path: &str,
+        schema: Arc<Schema>,
+        has_header: bool,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Result<Self> {
+        let projected_schema = match &projection {
+            Some(p) => {
+                let projected_fields: Vec<Field> =
+                    p.iter().map(|i| schema.fields()[*i].clone()).collect();
+
+                Arc::new(Schema::new(projected_fields))
+            }
+            None => schema,
+        };
+
+        Ok(Self {
+            path: path.to_string(),
+            schema: projected_schema,
+            has_header,
+            projection,
+            batch_size,
+        })
+    }
+
+    /// Recursively build a list of csv files in a directory
+    fn build_file_list(&self, dir: &str, filenames: &mut Vec<String>) -> Result<()> {
+        for entry in fs::read_dir(dir)? {
+            let entry = entry?;
+            let path = entry.path();
+            if let Some(path_name) = path.to_str() {
+                if path.is_dir() {
+                    self.build_file_list(path_name, filenames)?;
+                } else {
+                    if path_name.ends_with(".csv") {
+                        filenames.push(path_name.to_string());
+                    }
+                }
+            } else {
+                return Err(ExecutionError::General("Invalid path".to_string()));
+            }
+        }
+        Ok(())
+    }
+}
+
+/// CSV Partition
+struct CsvPartition {
+    /// Path to the CSV File
+    path: String,
+    /// Schema representing the CSV file
+    schema: Arc<Schema>,
+    /// Does the CSV file have a header?
+    has_header: bool,
+    /// Optional projection for which columns to load
+    projection: Option<Vec<usize>>,
+    /// Batch size
+    batch_size: usize,
+}
+
+impl CsvPartition {
+    fn new(
+        path: &str,
+        schema: Arc<Schema>,
+        has_header: bool,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Self {
+        Self {
+            path: path.to_string(),
+            schema,
+            has_header,
+            projection,
+            batch_size,
+        }
+    }
+}
+
+impl Partition for CsvPartition {
+    /// Execute this partition and return an iterator over RecordBatch
+    fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
+        Ok(Arc::new(Mutex::new(CsvIterator::try_new(
+            &self.path,
+            self.schema.clone(),
+            self.has_header,
+            &self.projection,
+            self.batch_size,
+        )?)))
+    }
+}
+
+/// Iterator over batches
+struct CsvIterator {
+    /// Arrow CSV reader
+    reader: csv::Reader<File>,
+}
+
+impl CsvIterator {
+    /// Create an iterator for a CSV file
+    pub fn try_new(
+        filename: &str,
+        schema: Arc<Schema>,
+        has_header: bool,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Result<Self> {
+        let file = File::open(filename)?;
+        let reader = csv::Reader::new(
+            file,
+            schema.clone(),
+            has_header,
+            batch_size,
+            projection.clone(),
+        );
+
+        Ok(Self { reader })
+    }
+}
+
+impl BatchIterator for CsvIterator {
+    /// Get the next RecordBatch
+    fn next(&mut self) -> Result<Option<RecordBatch>> {
+        Ok(self.reader.next()?)
+    }
+}
diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs
new file mode 100644
index 0000000..bd888a3
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/expressions.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::Result;
+use crate::execution::physical_plan::PhysicalExpr;
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+
+/// Represents the column at a given index in a RecordBatch
+pub struct Column {
+    index: usize,
+}
+
+impl Column {
+    /// Create a new column expression
+    pub fn new(index: usize) -> Self {
+        Self { index }
+    }
+}
+
+impl PhysicalExpr for Column {
+    /// Get the name to use in a schema to represent the result of this expression
+    fn name(&self) -> String {
+        format!("c{}", self.index)
+    }
+
+    /// Get the data type of this expression, given the schema of the input
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
+        Ok(input_schema.field(self.index).data_type().clone())
+    }
+
+    /// Evaluate the expression
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        Ok(batch.column(self.index).clone())
+    }
+}
diff --git a/rust/datafusion/src/execution/physical_plan.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
similarity index 68%
rename from rust/datafusion/src/execution/physical_plan.rs
rename to rust/datafusion/src/execution/physical_plan/mod.rs
index ee7c62a..7dae854 100644
--- a/rust/datafusion/src/execution/physical_plan.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -17,11 +17,12 @@
 
 //! Traits for physical query plan, supporting parallel execution for partitioned relations.
 
-use arrow::datatypes::Schema;
-use arrow::record_batch::RecordBatch;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use crate::error::Result;
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
 
 /// Partition-aware execution plan for a relation
 pub trait ExecutionPlan {
@@ -34,11 +35,25 @@ pub trait ExecutionPlan {
 /// Represents a partition of an execution plan that can be executed on a thread
 pub trait Partition: Send + Sync {
     /// Execute this partition and return an iterator over RecordBatch
-    fn execute(&self) -> Result<Arc<dyn BatchIterator>>;
+    fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>>;
 }
 
 /// Iterator over RecordBatch that can be sent between threads
 pub trait BatchIterator: Send + Sync {
     /// Get the next RecordBatch
-    fn next(&self) -> Result<Option<RecordBatch>>;
+    fn next(&mut self) -> Result<Option<RecordBatch>>;
 }
+
+/// Expression that can be evaluated against a RecordBatch
+pub trait PhysicalExpr: Send + Sync {
+    /// Get the name to use in a schema to represent the result of this expression
+    fn name(&self) -> String;
+    /// Get the data type of this expression, given the schema of the input
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
+    /// Evaluate an expression against a RecordBatch
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
+}
+
+pub mod csv;
+pub mod expressions;
+pub mod projection;
diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs
new file mode 100644
index 0000000..c384b86
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/projection.rs
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the projection execution plan. A projection determines which columns or expressions
+//! are returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example
+//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
+//! projection expressions.
+
+use std::sync::{Arc, Mutex};
+
+use crate::error::Result;
+use crate::execution::physical_plan::{
+    BatchIterator, ExecutionPlan, Partition, PhysicalExpr,
+};
+use arrow::datatypes::{Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+/// Execution plan for a projection
+pub struct ProjectionExec {
+    /// The projection expressions
+    expr: Vec<Arc<dyn PhysicalExpr>>,
+    /// The schema once the projection has been applied to the input
+    schema: Arc<Schema>,
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+}
+
+impl ProjectionExec {
+    /// Create a projection on an input
+    pub fn try_new(
+        expr: Vec<Arc<dyn PhysicalExpr>>,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Self> {
+        let input_schema = input.schema();
+
+        let fields: Result<Vec<_>> = expr
+            .iter()
+            .map(|e| Ok(Field::new(&e.name(), e.data_type(&input_schema)?, true)))
+            .collect();
+
+        let schema = Arc::new(Schema::new(fields?));
+
+        Ok(Self {
+            expr: expr.clone(),
+            schema,
+            input: input.clone(),
+        })
+    }
+}
+
+impl ExecutionPlan for ProjectionExec {
+    /// Get the schema for this execution plan
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    /// Get the partitions for this execution plan
+    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
+        let partitions: Vec<Arc<dyn Partition>> = self
+            .input
+            .partitions()?
+            .iter()
+            .map(|p| {
+                let expr = self.expr.clone();
+                let projection: Arc<dyn Partition> = Arc::new(ProjectionPartition {
+                    schema: self.schema.clone(),
+                    expr,
+                    input: p.clone() as Arc<dyn Partition>,
+                });
+
+                projection
+            })
+            .collect();
+
+        Ok(partitions)
+    }
+}
+
+/// Represents a single partition of a projection execution plan
+struct ProjectionPartition {
+    schema: Arc<Schema>,
+    expr: Vec<Arc<dyn PhysicalExpr>>,
+    input: Arc<dyn Partition>,
+}
+
+impl Partition for ProjectionPartition {
+    /// Execute the projection
+    fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
+        Ok(Arc::new(Mutex::new(ProjectionIterator {
+            schema: self.schema.clone(),
+            expr: self.expr.clone(),
+            input: self.input.execute()?,
+        })))
+    }
+}
+
+/// Projection iterator
+struct ProjectionIterator {
+    schema: Arc<Schema>,
+    expr: Vec<Arc<dyn PhysicalExpr>>,
+    input: Arc<Mutex<dyn BatchIterator>>,
+}
+
+impl BatchIterator for ProjectionIterator {
+    /// Get the next batch
+    fn next(&mut self) -> Result<Option<RecordBatch>> {
+        let mut input = self.input.lock().unwrap();
+        match input.next()? {
+            Some(batch) => {
+                let arrays: Result<Vec<_>> =
+                    self.expr.iter().map(|expr| expr.evaluate(&batch)).collect();
+                Ok(Some(RecordBatch::try_new(self.schema.clone(), arrays?)?))
+            }
+            None => Ok(None),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use super::*;
+    use crate::execution::physical_plan::csv::CsvExec;
+    use crate::execution::physical_plan::expressions::Column;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use std::env;
+    use std::fs;
+    use std::fs::File;
+    use std::io::prelude::*;
+    use std::io::{BufReader, BufWriter};
+    use std::path::Path;
+
+    #[test]
+    fn project_first_column() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, false),
+            Field::new("c2", DataType::UInt32, false),
+            Field::new("c3", DataType::Int8, false),
+            Field::new("c3", DataType::Int16, false),
+            Field::new("c4", DataType::Int32, false),
+            Field::new("c5", DataType::Int64, false),
+            Field::new("c6", DataType::UInt8, false),
+            Field::new("c7", DataType::UInt16, false),
+            Field::new("c8", DataType::UInt32, false),
+            Field::new("c9", DataType::UInt64, false),
+            Field::new("c10", DataType::Float32, false),
+            Field::new("c11", DataType::Float64, false),
+            Field::new("c12", DataType::Utf8, false),
+        ]));
+
+        let partitions = 4;
+        let path = create_partitioned_csv("aggregate_test_100.csv", partitions)?;
+
+        let csv = CsvExec::try_new(&path, schema, true, None, 1024)?;
+
+        let projection =
+            ProjectionExec::try_new(vec![Arc::new(Column::new(0))], Arc::new(csv))?;
+
+        let mut partition_count = 0;
+        let mut row_count = 0;
+        for partition in projection.partitions()? {
+            partition_count += 1;
+            let iterator = partition.execute()?;
+            let mut iterator = iterator.lock().unwrap();
+            while let Some(batch) = iterator.next()? {
+                assert_eq!(1, batch.num_columns());
+                row_count += batch.num_rows();
+            }
+        }
+        assert_eq!(partitions, partition_count);
+        assert_eq!(100, row_count);
+
+        Ok(())
+    }
+
+    /// Generated partitioned copy of a CSV file
+    fn create_partitioned_csv(filename: &str, partitions: usize) -> Result<String> {
+        let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+        let path = format!("{}/csv/{}", testdata, filename);
+
+        let mut dir = env::temp_dir();
+        dir.push(&format!("{}-{}", filename, partitions));
+
+        if Path::new(&dir).exists() {
+            fs::remove_dir_all(&dir).unwrap();
+        }
+        fs::create_dir(dir.clone()).unwrap();
+
+        let mut writers = vec![];
+        for i in 0..partitions {
+            let mut filename = dir.clone();
+            filename.push(format!("part{}.csv", i));
+            let writer = BufWriter::new(File::create(&filename).unwrap());
+            writers.push(writer);
+        }
+
+        let f = File::open(&path)?;
+        let f = BufReader::new(f);
+        let mut i = 0;
+        for line in f.lines() {
+            let line = line.unwrap();
+
+            if i == 0 {
+                // write header to all partitions
+                for w in writers.iter_mut() {
+                    w.write(line.as_bytes()).unwrap();
+                    w.write(b"\n").unwrap();
+                }
+            } else {
+                // write data line to single partition
+                let partition = i % partitions;
+                writers[partition].write(line.as_bytes()).unwrap();
+                writers[partition].write(b"\n").unwrap();
+            }
+
+            i += 1;
+        }
+        for w in writers.iter_mut() {
+            w.flush().unwrap();
+        }
+
+        Ok(dir.as_os_str().to_str().unwrap().to_string())
+    }
+}