You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/03/28 18:07:29 UTC
[arrow] branch master updated: ARROW-8249: [Rust] [DataFusion]
Table API now uses LogicalPlanBuilder
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c49b960 ARROW-8249: [Rust] [DataFusion] Table API now uses LogicalPlanBuilder
c49b960 is described below
commit c49b960d2b697135c8de45222c6377e427ba8aad
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sat Mar 28 12:07:05 2020 -0600
ARROW-8249: [Rust] [DataFusion] Table API now uses LogicalPlanBuilder
Table API now uses LogicalPlanBuilder for more concise and consistent code.
Closes #6748 from andygrove/ARROW-8249
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/examples/memory_table_api.rs | 25 +++---
rust/datafusion/src/execution/context.rs | 7 +-
rust/datafusion/src/execution/table_impl.rs | 122 ++++++++-------------------
rust/datafusion/src/table.rs | 7 +-
4 files changed, 50 insertions(+), 111 deletions(-)
diff --git a/rust/datafusion/examples/memory_table_api.rs b/rust/datafusion/examples/memory_table_api.rs
index 9fa218c..cf42264 100644
--- a/rust/datafusion/examples/memory_table_api.rs
+++ b/rust/datafusion/examples/memory_table_api.rs
@@ -26,11 +26,12 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
+use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::logicalplan::{Expr, ScalarValue};
/// This example demonstrates basic uses of the Table API on an in-memory table
-fn main() {
+fn main() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
@@ -44,31 +45,23 @@ fn main() {
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
],
- )
- .unwrap();
+ )?;
// declare a new context. In spark API, this corresponds to a new spark SQLsession
let mut ctx = ExecutionContext::new();
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
- let provider = MemTable::new(schema, vec![batch]).unwrap();
+ let provider = MemTable::new(schema, vec![batch])?;
ctx.register_table("t", Box::new(provider));
- let t = ctx.table("t").unwrap();
+ let t = ctx.table("t")?;
// construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL
- let filter = t
- .col("b")
- .unwrap()
- .eq(&Expr::Literal(ScalarValue::Int32(10)));
+ let filter = t.col("b")?.eq(&Expr::Literal(ScalarValue::Int32(10)));
- let t = t
- .select_columns(vec!["a", "b"])
- .unwrap()
- .filter(filter)
- .unwrap();
+ let t = t.select_columns(vec!["a", "b"])?.filter(filter)?;
// execute
- let results = t.collect(&mut ctx, 10).unwrap();
+ let results = t.collect(&mut ctx, 10)?;
// print results
results.iter().for_each(|batch| {
@@ -94,4 +87,6 @@ fn main() {
println!("{}, {}", c1.value(i), c2.value(i),);
}
});
+
+ Ok(())
}
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 47608e3..063f5bb 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -216,13 +216,16 @@ impl ExecutionContext {
pub fn table(&mut self, table_name: &str) -> Result<Arc<dyn Table>> {
match self.datasources.get(table_name) {
Some(provider) => {
- Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::TableScan {
+ let table_scan = LogicalPlan::TableScan {
schema_name: "".to_string(),
table_name: table_name.to_string(),
table_schema: provider.schema().clone(),
projected_schema: provider.schema().clone(),
projection: None,
- }))))
+ };
+ Ok(Arc::new(TableImpl::new(
+ &LogicalPlanBuilder::from(&table_scan).build()?,
+ )))
}
_ => Err(ExecutionError::General(format!(
"No table named '{}'",
diff --git a/rust/datafusion/src/execution/table_impl.rs b/rust/datafusion/src/execution/table_impl.rs
index e56cd76..10d65a4 100644
--- a/rust/datafusion/src/execution/table_impl.rs
+++ b/rust/datafusion/src/execution/table_impl.rs
@@ -19,72 +19,54 @@
use std::sync::Arc;
-use crate::arrow::datatypes::{DataType, Field, Schema};
+use crate::arrow::datatypes::DataType;
use crate::arrow::record_batch::RecordBatch;
use crate::error::{ExecutionError, Result};
use crate::execution::context::ExecutionContext;
-use crate::logicalplan::Expr::Literal;
-use crate::logicalplan::ScalarValue;
use crate::logicalplan::{Expr, LogicalPlan};
+use crate::logicalplan::{LogicalPlanBuilder, ScalarValue};
use crate::table::*;
/// Implementation of Table API
pub struct TableImpl {
- plan: Arc<LogicalPlan>,
+ plan: LogicalPlan,
}
impl TableImpl {
/// Create a new Table based on an existing logical plan
- pub fn new(plan: Arc<LogicalPlan>) -> Self {
- Self { plan }
+ pub fn new(plan: &LogicalPlan) -> Self {
+ Self { plan: plan.clone() }
}
}
impl Table for TableImpl {
/// Apply a projection based on a list of column names
fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<dyn Table>> {
- let mut expr: Vec<Expr> = Vec::with_capacity(columns.len());
- for column_name in columns {
- let i = self.column_index(column_name)?;
- expr.push(Expr::Column(i));
- }
- self.select(expr)
+ let exprs = columns
+ .iter()
+ .map(|name| {
+ self.plan
+ .schema()
+ .index_of(name.to_owned())
+ .and_then(|i| Ok(Expr::Column(i)))
+ .map_err(|e| e.into())
+ })
+ .collect::<Result<Vec<_>>>()?;
+ self.select(exprs)
}
/// Create a projection based on arbitrary expressions
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn Table>> {
- let schema = self.plan.schema();
- let mut field: Vec<Field> = Vec::with_capacity(expr_list.len());
-
- for expr in &expr_list {
- match expr {
- Expr::Column(i) => {
- field.push(schema.field(*i).clone());
- }
- other => {
- return Err(ExecutionError::NotImplemented(format!(
- "Expr {:?} is not currently supported in this context",
- other
- )))
- }
- }
- }
-
- Ok(Arc::new(TableImpl::new(Arc::new(
- LogicalPlan::Projection {
- expr: expr_list.clone(),
- input: self.plan.clone(),
- schema: Arc::new(Schema::new(field)),
- },
- ))))
+ let plan = LogicalPlanBuilder::from(&self.plan)
+ .project(expr_list)?
+ .build()?;
+ Ok(Arc::new(TableImpl::new(&plan)))
}
/// Create a selection based on a filter expression
fn filter(&self, expr: Expr) -> Result<Arc<dyn Table>> {
- Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::Selection {
- expr,
- input: self.plan.clone(),
- }))))
+ let plan = LogicalPlanBuilder::from(&self.plan).filter(expr)?.build()?;
+ Ok(Arc::new(TableImpl::new(&plan)))
}
/// Perform an aggregate query
@@ -93,38 +75,23 @@ impl Table for TableImpl {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<Arc<dyn Table>> {
- Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::Aggregate {
- input: self.plan.clone(),
- group_expr,
- aggr_expr,
- schema: Arc::new(Schema::new(vec![])),
- }))))
+ let plan = LogicalPlanBuilder::from(&self.plan)
+ .aggregate(group_expr, aggr_expr)?
+ .build()?;
+ Ok(Arc::new(TableImpl::new(&plan)))
}
/// Limit the number of rows
- fn limit(&self, n: usize) -> Result<Arc<dyn Table>> {
- Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::Limit {
- expr: Literal(ScalarValue::UInt32(n as u32)),
- input: self.plan.clone(),
- schema: self.plan.schema().clone(),
- }))))
+ fn limit(&self, n: u32) -> Result<Arc<dyn Table>> {
+ let plan = LogicalPlanBuilder::from(&self.plan)
+ .limit(Expr::Literal(ScalarValue::UInt32(n)))?
+ .build()?;
+ Ok(Arc::new(TableImpl::new(&plan)))
}
/// Return an expression representing a column within this table
fn col(&self, name: &str) -> Result<Expr> {
- Ok(Expr::Column(self.column_index(name)?))
- }
-
- /// Return the index of a column within this table's schema
- fn column_index(&self, name: &str) -> Result<usize> {
- let schema = self.plan.schema();
- match schema.column_with_name(name) {
- Some((i, _)) => Ok(i),
- _ => Err(ExecutionError::InvalidColumn(format!(
- "No column named '{}'",
- name
- ))),
- }
+ Ok(Expr::Column(self.plan.schema().index_of(name)?))
}
/// Create an expression to represent the min() aggregate function
@@ -153,7 +120,7 @@ impl Table for TableImpl {
}
/// Convert to logical plan
- fn to_logical_plan(&self) -> Arc<LogicalPlan> {
+ fn to_logical_plan(&self) -> LogicalPlan {
self.plan.clone()
}
@@ -196,14 +163,6 @@ mod tests {
use crate::test;
#[test]
- fn column_index() {
- let t = test_table();
- assert_eq!(0, t.column_index("c1").unwrap());
- assert_eq!(1, t.column_index("c2").unwrap());
- assert_eq!(12, t.column_index("c13").unwrap());
- }
-
- #[test]
fn select_columns() -> Result<()> {
// build plan using Table API
let t = test_table();
@@ -236,21 +195,6 @@ mod tests {
}
#[test]
- fn select_invalid_column() -> Result<()> {
- let t = test_table();
-
- match t.col("invalid_column_name") {
- Ok(_) => panic!(),
- Err(e) => assert_eq!(
- "InvalidColumn(\"No column named \\\'invalid_column_name\\\'\")",
- format!("{:?}", e)
- ),
- }
-
- Ok(())
- }
-
- #[test]
fn aggregate() -> Result<()> {
// build plan using Table API
let t = test_table();
diff --git a/rust/datafusion/src/table.rs b/rust/datafusion/src/table.rs
index 37c86f6..c60c1df 100644
--- a/rust/datafusion/src/table.rs
+++ b/rust/datafusion/src/table.rs
@@ -43,10 +43,10 @@ pub trait Table {
) -> Result<Arc<dyn Table>>;
/// limit the number of rows
- fn limit(&self, n: usize) -> Result<Arc<dyn Table>>;
+ fn limit(&self, n: u32) -> Result<Arc<dyn Table>>;
/// Return the logical plan
- fn to_logical_plan(&self) -> Arc<LogicalPlan>;
+ fn to_logical_plan(&self) -> LogicalPlan;
/// Return an expression representing a column within this table
fn col(&self, name: &str) -> Result<Expr>;
@@ -66,9 +66,6 @@ pub trait Table {
/// Create an expression to represent the count() aggregate function
fn count(&self, expr: &Expr) -> Result<Expr>;
- /// Return the index of a column within this table's schema
- fn column_index(&self, name: &str) -> Result<usize>;
-
/// Collects the result as a vector of RecordBatch.
fn collect(
&self,