You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/16 16:21:52 UTC

[arrow] branch master updated: ARROW-9742: [Rust] [DataFusion] Improved DataFrame trait (formerly known as the Table trait)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 613ab4a  ARROW-9742: [Rust] [DataFusion] Improved DataFrame trait (formerly known as the Table trait)
613ab4a is described below

commit 613ab4a7ba2c563bb68dbe8d5a1ca215d89e2e49
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Aug 16 10:20:58 2020 -0600

    ARROW-9742: [Rust] [DataFusion] Improved DataFrame trait (formerly known as the Table trait)
    
    This PR renames the `Table` trait to `DataFrame` and also contains the following improvements:
    
    - Updates the `collect` method signature to remove the reference to an execution context (which made no sense because the table/dataframe struct was created by the context).
    - Adds rustdocs for DataFrame and ExecutionContext with code examples
    - Updates existing examples to call `ExecutionContext.sql()` instead of referencing logical and physical query plans - these are internal implementation details that should be of no concern to typical users
    - Adds a new example that uses the DataFrame trait to build a query
    - A few other minor UX improvements so that I could show some real examples
    
    Note that the goal of this PR is mostly to document the API as it is today. There are number of design flaws that still need fixing and I will be creating follow-up PRs for those.
    
    Closes #7972 from andygrove/dataframe
    
    Lead-authored-by: Andy Grove <an...@gmail.com>
    Co-authored-by: Andy Grove <an...@users.noreply.github.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/examples/csv_sql.rs                |  12 +-
 .../examples/{parquet_sql.rs => dataframe.rs}      |  26 +-
 rust/datafusion/examples/memory_table_api.rs       |  10 +-
 rust/datafusion/examples/parquet_sql.rs            |  11 +-
 rust/datafusion/src/dataframe.rs                   | 168 +++++++++++
 rust/datafusion/src/execution/context.rs           | 332 +++++++++++++++------
 .../execution/{table_impl.rs => dataframe_impl.rs} |  88 +++---
 rust/datafusion/src/execution/mod.rs               |   2 +-
 rust/datafusion/src/lib.rs                         |   4 +-
 rust/datafusion/src/logicalplan.rs                 |  85 +++---
 rust/datafusion/src/optimizer/type_coercion.rs     |  28 +-
 rust/datafusion/src/table.rs                       |  79 -----
 rust/datafusion/tests/example.csv                  |   2 +
 13 files changed, 546 insertions(+), 301 deletions(-)

diff --git a/rust/datafusion/examples/csv_sql.rs b/rust/datafusion/examples/csv_sql.rs
index 7300bc8..df17d44 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/examples/csv_sql.rs
@@ -19,7 +19,7 @@ use arrow::util::pretty;
 
 use datafusion::datasource::csv::CsvReadOptions;
 use datafusion::error::Result;
-use datafusion::execution::context::ExecutionContext;
+use datafusion::ExecutionContext;
 
 /// This example demonstrates executing a simple query against an Arrow data source (CSV) and
 /// fetching results
@@ -36,15 +36,9 @@ fn main() -> Result<()> {
         CsvReadOptions::new(),
     )?;
 
-    let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1";
-
-    // create the query plan
-    let plan = ctx.create_logical_plan(sql)?;
-    let plan = ctx.optimize(&plan)?;
-    let plan = ctx.create_physical_plan(&plan, 1024 * 1024)?;
-
     // execute the query
-    let results = ctx.collect(plan.as_ref())?;
+    let batch_size = 4096;
+    let results = ctx.sql("SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1", batch_size)?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/parquet_sql.rs b/rust/datafusion/examples/dataframe.rs
similarity index 66%
copy from rust/datafusion/examples/parquet_sql.rs
copy to rust/datafusion/examples/dataframe.rs
index 6e635dc..df9a09b 100644
--- a/rust/datafusion/examples/parquet_sql.rs
+++ b/rust/datafusion/examples/dataframe.rs
@@ -17,10 +17,11 @@
 
 use arrow::util::pretty;
 use datafusion::error::Result;
-use datafusion::execution::context::ExecutionContext;
+use datafusion::logicalplan::{col, lit};
+use datafusion::ExecutionContext;
 
 /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
-/// fetching results
+/// fetching results, using the DataFrame trait
 fn main() -> Result<()> {
     // create local execution context
     let mut ctx = ExecutionContext::new();
@@ -28,22 +29,17 @@ fn main() -> Result<()> {
     let testdata =
         std::env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
 
-    // register parquet file with the execution context
-    ctx.register_parquet(
-        "alltypes_plain",
-        &format!("{}/alltypes_plain.parquet", testdata),
-    )?;
+    let filename = &format!("{}/alltypes_plain.parquet", testdata);
 
-    // simple selection
-    let sql = "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) FROM alltypes_plain WHERE id > 1 AND tinyint_col < double_col";
-
-    // create the query plan
-    let plan = ctx.create_logical_plan(&sql)?;
-    let plan = ctx.optimize(&plan)?;
-    let plan = ctx.create_physical_plan(&plan, 1024 * 1024)?;
+    // define the query using the DataFrame trait
+    let df = ctx
+        .read_parquet(filename)?
+        .filter(col("id").gt(lit(1)))?
+        .filter(col("tinyint_col").lt(col("tinyint_col")))?;
 
     // execute the query
-    let results = ctx.collect(plan.as_ref())?;
+    let batch_size = 4096;
+    let results = df.collect(batch_size)?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/memory_table_api.rs b/rust/datafusion/examples/memory_table_api.rs
index 937d80e..9f6c057 100644
--- a/rust/datafusion/examples/memory_table_api.rs
+++ b/rust/datafusion/examples/memory_table_api.rs
@@ -26,7 +26,7 @@ use arrow::util::pretty;
 use datafusion::datasource::MemTable;
 use datafusion::error::Result;
 use datafusion::execution::context::ExecutionContext;
-use datafusion::logicalplan::lit;
+use datafusion::logicalplan::{col, lit};
 
 /// This example demonstrates basic uses of the Table API on an in-memory table
 fn main() -> Result<()> {
@@ -51,15 +51,15 @@ fn main() -> Result<()> {
     // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
     let provider = MemTable::new(schema, vec![vec![batch]])?;
     ctx.register_table("t", Box::new(provider));
-    let t = ctx.table("t")?;
+    let df = ctx.table("t")?;
 
     // construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL
-    let filter = t.col("b")?.eq(&lit(10));
+    let filter = col("b").eq(lit(10));
 
-    let t = t.select_columns(vec!["a", "b"])?.filter(filter)?;
+    let df = df.select_columns(vec!["a", "b"])?.filter(filter)?;
 
     // execute
-    let results = t.collect(&mut ctx, 10)?;
+    let results = df.collect(10)?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/parquet_sql.rs b/rust/datafusion/examples/parquet_sql.rs
index 6e635dc..d05cf26 100644
--- a/rust/datafusion/examples/parquet_sql.rs
+++ b/rust/datafusion/examples/parquet_sql.rs
@@ -34,16 +34,9 @@ fn main() -> Result<()> {
         &format!("{}/alltypes_plain.parquet", testdata),
     )?;
 
-    // simple selection
-    let sql = "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) FROM alltypes_plain WHERE id > 1 AND tinyint_col < double_col";
-
-    // create the query plan
-    let plan = ctx.create_logical_plan(&sql)?;
-    let plan = ctx.optimize(&plan)?;
-    let plan = ctx.create_physical_plan(&plan, 1024 * 1024)?;
-
     // execute the query
-    let results = ctx.collect(plan.as_ref())?;
+    let batch_size = 4096;
+    let results = ctx.sql("SELECT int_col, double_col, CAST(date_string_col as VARCHAR) FROM alltypes_plain WHERE id > 1 AND tinyint_col < double_col", batch_size)?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs
new file mode 100644
index 0000000..645dea1
--- /dev/null
+++ b/rust/datafusion/src/dataframe.rs
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataFrame API for building and executing query plans.
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::Result;
+use crate::logicalplan::{Expr, LogicalPlan};
+use arrow::datatypes::Schema;
+use std::sync::Arc;
+
+/// DataFrame represents a logical set of rows with the same named columns.
+/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
+/// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html)
+///
+/// DataFrames are typically created by the `read_csv` and `read_parquet` methods on the
+/// [ExecutionContext](../execution/context/struct.ExecutionContext.html) and can then be modified
+/// by calling the transformation methods, such as `filter`, `select`, `aggregate`, and `limit`
+/// to build up a query definition.
+///
+/// The query can be executed by calling the `collect` method.
+///
+/// ```
+/// use datafusion::ExecutionContext;
+/// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+/// use datafusion::logicalplan::col;
+///
+/// let mut ctx = ExecutionContext::new();
+/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+/// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
+///            .aggregate(vec![col("a")], vec![df.min(col("b")).unwrap()]).unwrap()
+///            .limit(100).unwrap();
+/// let results = df.collect(4096);
+/// ```
+pub trait DataFrame {
+    /// Filter the DataFrame by column. Returns a new DataFrame only containing the
+    /// specified columns.
+    ///
+    /// ```
+    /// use datafusion::ExecutionContext;
+    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// let mut ctx = ExecutionContext::new();
+    ///
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+    /// let df = df.select_columns(vec!["a", "b"]).unwrap();
+    /// ```
+    fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<dyn DataFrame>>;
+
+    /// Create a projection based on arbitrary expressions.
+    ///
+    /// ```
+    /// use datafusion::ExecutionContext;
+    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// use datafusion::logicalplan::col;
+    ///
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+    /// let df = df.select(vec![col("a").multiply(col("b")), col("c")]).unwrap();
+    /// ```
+    fn select(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>>;
+
+    /// Filter a DataFrame to only include rows that match the specified filter expression.
+    ///
+    /// ```
+    /// use datafusion::ExecutionContext;
+    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// use datafusion::logicalplan::col;
+    ///
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+    /// let df = df.filter(col("a").lt_eq(col("b"))).unwrap();
+    /// ```
+    fn filter(&self, expr: Expr) -> Result<Arc<dyn DataFrame>>;
+
+    /// Perform an aggregate query with optional grouping expressions.
+    ///
+    /// ```
+    /// use datafusion::ExecutionContext;
+    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// use datafusion::logicalplan::col;
+    ///
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+    ///
+    /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
+    /// let _ = df.aggregate(vec![col("a")], vec![df.min(col("b")).unwrap()]).unwrap();
+    ///
+    /// // The following use is the equivalent of "SELECT MIN(b)"
+    /// let _ = df.aggregate(vec![], vec![df.min(col("b")).unwrap()]).unwrap();
+    /// ```
+    fn aggregate(
+        &self,
+        group_expr: Vec<Expr>,
+        aggr_expr: Vec<Expr>,
+    ) -> Result<Arc<dyn DataFrame>>;
+
+    /// limit the number of rows returned from this DataFrame.
+    ///
+    /// ```
+    /// use datafusion::ExecutionContext;
+    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// use datafusion::logicalplan::col;
+    ///
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+    /// let df = df.limit(100).unwrap();
+    /// ```
+    fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>>;
+
+    /// Executes this DataFrame and collects all results into a vector of RecordBatch.
+    ///
+    /// ```
+    /// use datafusion::ExecutionContext;
+    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// use datafusion::logicalplan::col;
+    ///
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+    /// let batches = df.collect(4096).unwrap();
+    /// ```
+    fn collect(&self, batch_size: usize) -> Result<Vec<RecordBatch>>;
+
+    /// Returns the schema describing the output of this DataFrame in terms of columns returned,
+    /// where each column has a name, data type, and nullability attribute.
+
+    /// ```
+    /// use datafusion::ExecutionContext;
+    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// use datafusion::logicalplan::col;
+    ///
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+    /// let schema = df.schema();
+    /// ```
+    fn schema(&self) -> &Schema;
+
+    /// Return the logical plan represented by this DataFrame.
+    fn to_logical_plan(&self) -> LogicalPlan;
+
+    /// Create an expression to represent the min() aggregate function
+    fn min(&self, expr: Expr) -> Result<Expr>;
+
+    /// Create an expression to represent the max() aggregate function
+    fn max(&self, expr: Expr) -> Result<Expr>;
+
+    /// Create an expression to represent the sum() aggregate function
+    fn sum(&self, expr: Expr) -> Result<Expr>;
+
+    /// Create an expression to represent the avg() aggregate function
+    fn avg(&self, expr: Expr) -> Result<Expr>;
+
+    /// Create an expression to represent the count() aggregate function
+    fn count(&self, expr: Expr) -> Result<Expr>;
+}
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index cb0e608..c97906c 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -21,17 +21,19 @@ use std::collections::{HashMap, HashSet};
 use std::fs;
 use std::path::Path;
 use std::string::String;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::thread::{self, JoinHandle};
 
 use arrow::csv;
 use arrow::datatypes::*;
 use arrow::record_batch::RecordBatch;
 
+use crate::dataframe::DataFrame;
 use crate::datasource::csv::CsvFile;
 use crate::datasource::parquet::ParquetTable;
 use crate::datasource::TableProvider;
 use crate::error::{ExecutionError, Result};
+use crate::execution::dataframe_impl::DataFrameImpl;
 use crate::execution::physical_plan::common;
 use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
 use crate::execution::physical_plan::datasource::DatasourceExec;
@@ -50,7 +52,6 @@ use crate::execution::physical_plan::selection::SelectionExec;
 use crate::execution::physical_plan::sort::{SortExec, SortOptions};
 use crate::execution::physical_plan::udf::{ScalarFunction, ScalarFunctionExpr};
 use crate::execution::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
-use crate::execution::table_impl::TableImpl;
 use crate::logicalplan::{
     Expr, FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder, PlanType,
     StringifiedPlan,
@@ -62,37 +63,45 @@ use crate::sql::{
     parser::{DFParser, FileType},
     planner::{SchemaProvider, SqlToRel},
 };
-use crate::table::Table;
 
-/// Configuration options for execution context
-#[derive(Copy, Clone)]
-pub struct ExecutionConfig {
-    /// Number of concurrent threads for query execution.
-    concurrency: usize,
-}
-
-impl ExecutionConfig {
-    /// Create an execution config with default settings
-    pub fn new() -> Self {
-        Self {
-            concurrency: num_cpus::get(),
-        }
-    }
-
-    /// Customize max_concurrency
-    pub fn with_concurrency(mut self, n: usize) -> Self {
-        // concurrency must be greater than zero
-        assert!(n > 0);
-        self.concurrency = n;
-        self
-    }
-}
-
-/// Execution context for registering data sources and executing queries
+/// ExecutionContext is the main interface for executing queries with DataFusion. The context
+/// provides the following functionality:
+///
+/// * Create DataFrame from a CSV or Parquet data source.
+/// * Register a CSV or Parquet data source as a table that can be referenced from a SQL query.
+/// * Register a custom data source that can be referenced from a SQL query.
+/// * Execution a SQL query
+///
+/// The following example demonstrates how to use the context to execute a query against a CSV
+/// data source:
+///
+/// ```
+/// use datafusion::ExecutionContext;
+/// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+/// use datafusion::logicalplan::col;
+///
+/// let mut ctx = ExecutionContext::new();
+/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
+/// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
+///            .aggregate(vec![col("a")], vec![df.min(col("b")).unwrap()]).unwrap()
+///            .limit(100).unwrap();
+/// let results = df.collect(4096);
+/// ```
+///
+/// The following example demonstrates how to execute the same query using SQL:
+///
+/// ```
+/// use datafusion::ExecutionContext;
+/// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+/// use datafusion::logicalplan::col;
+///
+/// let mut ctx = ExecutionContext::new();
+/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
+/// let batch_size = 4096;
+/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100", batch_size).unwrap();
+/// ```
 pub struct ExecutionContext {
-    datasources: HashMap<String, Box<dyn TableProvider + Send + Sync>>,
-    scalar_functions: HashMap<String, Box<ScalarFunction>>,
-    config: ExecutionConfig,
+    state: Arc<Mutex<ExecutionContextState>>,
 }
 
 fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
@@ -105,17 +114,19 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
 }
 
 impl ExecutionContext {
-    /// Create a new execution context for in-memory queries using default configs
+    /// Create a new execution context using a default configuration.
     pub fn new() -> Self {
         Self::with_config(ExecutionConfig::new())
     }
 
-    /// Create a new execution context for in-memory queries using provided configs
+    /// Create a new execution context using the provided configuration
     pub fn with_config(config: ExecutionConfig) -> Self {
         let mut ctx = Self {
-            datasources: HashMap::new(),
-            scalar_functions: HashMap::new(),
-            config,
+            state: Arc::new(Mutex::new(ExecutionContextState {
+                datasources: Arc::new(Mutex::new(HashMap::new())),
+                scalar_functions: Arc::new(Mutex::new(HashMap::new())),
+                config,
+            })),
         };
         for udf in scalar_functions() {
             ctx.register_udf(udf);
@@ -123,6 +134,11 @@ impl ExecutionContext {
         ctx
     }
 
+    /// Create a context from existing context state
+    pub(crate) fn from(state: Arc<Mutex<ExecutionContextState>>) -> Self {
+        Self { state }
+    }
+
     /// Execute a SQL query and produce a Relation (a schema-aware iterator over a series
     /// of RecordBatch instances)
     pub fn sql(&mut self, sql: &str, batch_size: usize) -> Result<Vec<RecordBatch>> {
@@ -132,7 +148,8 @@ impl ExecutionContext {
     }
 
     /// Executes a logical plan and produce a Relation (a schema-aware iterator over a series
-    /// of RecordBatch instances)
+    /// of RecordBatch instances). This function is intended for internal use and should not be
+    /// called directly.
     pub fn collect_plan(
         &mut self,
         plan: &LogicalPlan,
@@ -174,7 +191,8 @@ impl ExecutionContext {
         }
     }
 
-    /// Creates a logical plan
+    /// Creates a logical plan. This function is intended for internal use and should not be
+    /// called directly.
     pub fn create_logical_plan(&mut self, sql: &str) -> Result<LogicalPlan> {
         let statements = DFParser::parse_sql(sql)?;
 
@@ -184,27 +202,73 @@ impl ExecutionContext {
             )));
         }
 
-        let schema_provider = ExecutionContextSchemaProvider {
-            datasources: &self.datasources,
-            scalar_functions: &self.scalar_functions,
-        };
-
         // create a query planner
-        let query_planner = SqlToRel::new(schema_provider);
+        let state = self.state.lock().expect("failed to lock mutex");
+        let query_planner = SqlToRel::new(state.clone());
         Ok(query_planner.statement_to_plan(&statements[0])?)
     }
 
     /// Register a scalar UDF
     pub fn register_udf(&mut self, f: ScalarFunction) {
-        self.scalar_functions.insert(f.name.clone(), Box::new(f));
+        let state = self.state.lock().expect("failed to lock mutex");
+        state
+            .scalar_functions
+            .lock()
+            .expect("failed to lock mutex")
+            .insert(f.name.clone(), Box::new(f));
     }
 
     /// Get a reference to the registered scalar functions
-    pub fn scalar_functions(&self) -> &HashMap<String, Box<ScalarFunction>> {
-        &self.scalar_functions
+    pub fn scalar_functions(&self) -> Arc<Mutex<HashMap<String, Box<ScalarFunction>>>> {
+        self.state
+            .lock()
+            .expect("failed to lock mutex")
+            .scalar_functions
+            .clone()
+    }
+
+    /// Creates a DataFrame for reading a CSV data source.
+    pub fn read_csv(
+        &mut self,
+        filename: &str,
+        options: CsvReadOptions,
+    ) -> Result<Arc<dyn DataFrame>> {
+        let csv = CsvFile::try_new(filename, options)?;
+
+        let table_scan = LogicalPlan::CsvScan {
+            path: filename.to_string(),
+            schema: Box::new(csv.schema().as_ref().to_owned()),
+            has_header: options.has_header,
+            delimiter: Some(options.delimiter),
+            projection: None,
+            projected_schema: Box::new(csv.schema().as_ref().to_owned()),
+        };
+
+        Ok(Arc::new(DataFrameImpl::new(
+            self.state.clone(),
+            &LogicalPlanBuilder::from(&table_scan).build()?,
+        )))
+    }
+
+    /// Creates a DataFrame for reading a Parquet data source.
+    pub fn read_parquet(&mut self, filename: &str) -> Result<Arc<dyn DataFrame>> {
+        let parquet = ParquetTable::try_new(filename)?;
+
+        let table_scan = LogicalPlan::ParquetScan {
+            path: filename.to_string(),
+            schema: Box::new(parquet.schema().as_ref().to_owned()),
+            projection: None,
+            projected_schema: Box::new(parquet.schema().as_ref().to_owned()),
+        };
+
+        Ok(Arc::new(DataFrameImpl::new(
+            self.state.clone(),
+            &LogicalPlanBuilder::from(&table_scan).build()?,
+        )))
     }
 
-    /// Register a CSV file as a table so that it can be queried from SQL
+    /// Register a CSV data source so that it can be referenced from SQL statements
+    /// executed against this context.
     pub fn register_csv(
         &mut self,
         name: &str,
@@ -215,25 +279,42 @@ impl ExecutionContext {
         Ok(())
     }
 
-    /// Register a Parquet file as a table so that it can be queried from SQL
+    /// Register a Parquet data source so that it can be referenced from SQL statements
+    /// executed against this context.
     pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
         let table = ParquetTable::try_new(&filename)?;
         self.register_table(name, Box::new(table));
         Ok(())
     }
 
-    /// Register a table so that it can be queried from SQL
+    /// Register a table using a custom TableProvider so that it can be referenced from SQL
+    /// statements executed against this context.
     pub fn register_table(
         &mut self,
         name: &str,
         provider: Box<dyn TableProvider + Send + Sync>,
     ) {
-        self.datasources.insert(name.to_string(), provider);
+        let state = self.state.lock().expect("failed to lock mutex");
+        state
+            .datasources
+            .lock()
+            .expect("failed to lock mutex")
+            .insert(name.to_string(), provider);
     }
 
-    /// Get a table by name
-    pub fn table(&mut self, table_name: &str) -> Result<Arc<dyn Table>> {
-        match self.datasources.get(table_name) {
+    /// Retrieves a DataFrame representing a table previously registered by calling the
+    /// register_table function. An Err result will be returned if no table has been
+    /// registered with the provided name.
+    pub fn table(&mut self, table_name: &str) -> Result<Arc<dyn DataFrame>> {
+        match self
+            .state
+            .lock()
+            .expect("failed to lock mutex")
+            .datasources
+            .lock()
+            .expect("failed to lock mutex")
+            .get(table_name)
+        {
             Some(provider) => {
                 let schema = provider.schema().as_ref().clone();
                 let table_scan = LogicalPlan::TableScan {
@@ -243,7 +324,8 @@ impl ExecutionContext {
                     projected_schema: Box::new(schema),
                     projection: None,
                 };
-                Ok(Arc::new(TableImpl::new(
+                Ok(Arc::new(DataFrameImpl::new(
+                    self.state.clone(),
                     &LogicalPlanBuilder::from(&table_scan).build()?,
                 )))
             }
@@ -256,14 +338,28 @@ impl ExecutionContext {
 
     /// The set of available tables. Use `table` to get a specific table.
     pub fn tables(&self) -> HashSet<String> {
-        self.datasources.keys().cloned().collect()
+        self.state
+            .lock()
+            .expect("failed to lock mutex")
+            .datasources
+            .lock()
+            .expect("failed to lock mutex")
+            .keys()
+            .cloned()
+            .collect()
     }
 
     /// Optimize the logical plan by applying optimizer rules
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
         let rules: Vec<Box<dyn OptimizerRule>> = vec![
             Box::new(ProjectionPushDown::new()),
-            Box::new(TypeCoercionRule::new(&self.scalar_functions)),
+            Box::new(TypeCoercionRule::new(
+                self.state
+                    .lock()
+                    .expect("failed to lock mutex")
+                    .scalar_functions
+                    .clone(),
+            )),
         ];
         let mut plan = plan.clone();
 
@@ -284,7 +380,15 @@ impl ExecutionContext {
                 table_name,
                 projection,
                 ..
-            } => match self.datasources.get(table_name) {
+            } => match self
+                .state
+                .lock()
+                .expect("failed to lock mutex")
+                .datasources
+                .lock()
+                .expect("failed to lock mutex")
+                .get(table_name)
+            {
                 Some(provider) => {
                     let partitions = provider.scan(projection, batch_size)?;
                     if partitions.is_empty() {
@@ -402,7 +506,11 @@ impl ExecutionContext {
                 let merge = Arc::new(MergeExec::new(
                     schema.clone(),
                     partitions,
-                    self.config.concurrency,
+                    self.state
+                        .lock()
+                        .expect("failed to lock mutex")
+                        .config
+                        .concurrency,
                 ));
 
                 // construct the expressions for the final aggregation
@@ -461,7 +569,11 @@ impl ExecutionContext {
                 Ok(Arc::new(SortExec::try_new(
                     sort_expr,
                     input,
-                    self.config.concurrency,
+                    self.state
+                        .lock()
+                        .expect("failed to lock mutex")
+                        .config
+                        .concurrency,
                 )?))
             }
             LogicalPlan::Limit { input, n, .. } => {
@@ -472,7 +584,11 @@ impl ExecutionContext {
                     input_schema.clone(),
                     input.partitions()?,
                     *n,
-                    self.config.concurrency,
+                    self.state
+                        .lock()
+                        .expect("failed to lock mutex")
+                        .config
+                        .concurrency,
                 )))
             }
             LogicalPlan::Explain {
@@ -533,7 +649,15 @@ impl ExecutionContext {
                 name,
                 args,
                 return_type,
-            } => match &self.scalar_functions.get(name) {
+            } => match &self
+                .state
+                .lock()
+                .expect("failed to lock mutex")
+                .scalar_functions
+                .lock()
+                .expect("failed to lock mutex")
+                .get(name)
+            {
                 Some(f) => {
                     let mut physical_args = vec![];
                     for e in args {
@@ -623,7 +747,11 @@ impl ExecutionContext {
                 let plan = MergeExec::new(
                     plan.schema().clone(),
                     partitions,
-                    self.config.concurrency,
+                    self.state
+                        .lock()
+                        .expect("failed to lock mutex")
+                        .config
+                        .concurrency,
                 );
                 let partitions = plan.partitions()?;
                 if partitions.len() == 1 {
@@ -682,39 +810,60 @@ impl ExecutionContext {
     }
 }
 
-/// Get schema and scalar functions for execution context
-pub struct ExecutionContextSchemaProvider<'a> {
-    datasources: &'a HashMap<String, Box<dyn TableProvider + Send + Sync>>,
-    scalar_functions: &'a HashMap<String, Box<ScalarFunction>>,
+/// Configuration options for execution context
+#[derive(Copy, Clone)]
+pub struct ExecutionConfig {
+    /// Number of concurrent threads for query execution.
+    concurrency: usize,
 }
 
-impl<'a> ExecutionContextSchemaProvider<'a> {
-    /// Create a new ExecutionContextSchemaProvider based on data sources and scalar functions
-    pub fn new(
-        datasources: &'a HashMap<String, Box<dyn TableProvider + Send + Sync>>,
-        scalar_functions: &'a HashMap<String, Box<ScalarFunction>>,
-    ) -> Self {
-        ExecutionContextSchemaProvider {
-            datasources,
-            scalar_functions,
+impl ExecutionConfig {
+    /// Create an execution config with default settings
+    pub fn new() -> Self {
+        Self {
+            concurrency: num_cpus::get(),
         }
     }
+
+    /// Customize max_concurrency
+    pub fn with_concurrency(mut self, n: usize) -> Self {
+        // concurrency must be greater than zero
+        assert!(n > 0);
+        self.concurrency = n;
+        self
+    }
+}
+
+/// Execution context for registering data sources and executing queries
+#[derive(Clone)]
+pub struct ExecutionContextState {
+    datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>,
+    scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+    config: ExecutionConfig,
 }
 
-impl SchemaProvider for ExecutionContextSchemaProvider<'_> {
+impl SchemaProvider for ExecutionContextState {
     fn get_table_meta(&self, name: &str) -> Option<SchemaRef> {
-        self.datasources.get(name).map(|ds| ds.schema().clone())
+        self.datasources
+            .lock()
+            .expect("failed to lock mutex")
+            .get(name)
+            .map(|ds| ds.schema().clone())
     }
 
     fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>> {
-        self.scalar_functions.get(name).map(|f| {
-            Arc::new(FunctionMeta::new(
-                name.to_owned(),
-                f.args.clone(),
-                f.return_type.clone(),
-                FunctionType::Scalar,
-            ))
-        })
+        self.scalar_functions
+            .lock()
+            .expect("failed to lock mutex")
+            .get(name)
+            .map(|f| {
+                Arc::new(FunctionMeta::new(
+                    name.to_owned(),
+                    f.args.clone(),
+                    f.return_type.clone(),
+                    FunctionType::Scalar,
+                ))
+            })
     }
 }
 
@@ -824,7 +973,16 @@ mod tests {
         let tmp_dir = TempDir::new("execute")?;
         let ctx = create_ctx(&tmp_dir, 1)?;
 
-        let schema = ctx.datasources.get("test").unwrap().schema();
+        let schema = ctx
+            .state
+            .lock()
+            .expect("failed to lock mutex")
+            .datasources
+            .lock()
+            .expect("failed to lock mutex")
+            .get("test")
+            .unwrap()
+            .schema();
         assert_eq!(schema.field_with_name("c1")?.is_nullable(), false);
 
         let plan = LogicalPlanBuilder::scan("default", "test", schema.as_ref(), None)?
diff --git a/rust/datafusion/src/execution/table_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs
similarity index 79%
rename from rust/datafusion/src/execution/table_impl.rs
rename to rust/datafusion/src/execution/dataframe_impl.rs
index 7494ba5..bc7afd5 100644
--- a/rust/datafusion/src/execution/table_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -15,33 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Implementation of Table API
+//! Implementation of DataFrame API
 
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use crate::arrow::datatypes::DataType;
 use crate::arrow::record_batch::RecordBatch;
+use crate::dataframe::*;
 use crate::error::{ExecutionError, Result};
-use crate::execution::context::ExecutionContext;
+use crate::execution::context::{ExecutionContext, ExecutionContextState};
 use crate::logicalplan::{col, Expr, LogicalPlan, LogicalPlanBuilder};
-use crate::table::*;
 use arrow::datatypes::Schema;
 
-/// Implementation of Table API
-pub struct TableImpl {
+/// Implementation of DataFrame API
+pub struct DataFrameImpl {
+    ctx_state: Arc<Mutex<ExecutionContextState>>,
     plan: LogicalPlan,
 }
 
-impl TableImpl {
+impl DataFrameImpl {
     /// Create a new Table based on an existing logical plan
-    pub fn new(plan: &LogicalPlan) -> Self {
-        Self { plan: plan.clone() }
+    pub fn new(ctx_state: Arc<Mutex<ExecutionContextState>>, plan: &LogicalPlan) -> Self {
+        Self {
+            ctx_state,
+            plan: plan.clone(),
+        }
     }
 }
 
-impl Table for TableImpl {
+impl DataFrame for DataFrameImpl {
     /// Apply a projection based on a list of column names
-    fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<dyn Table>> {
+    fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<dyn DataFrame>> {
         let exprs = columns
             .iter()
             .map(|name| {
@@ -57,17 +61,17 @@ impl Table for TableImpl {
     }
 
     /// Create a projection based on arbitrary expressions
-    fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn Table>> {
+    fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
         let plan = LogicalPlanBuilder::from(&self.plan)
             .project(expr_list)?
             .build()?;
-        Ok(Arc::new(TableImpl::new(&plan)))
+        Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
 
     /// Create a selection based on a filter expression
-    fn filter(&self, expr: Expr) -> Result<Arc<dyn Table>> {
+    fn filter(&self, expr: Expr) -> Result<Arc<dyn DataFrame>> {
         let plan = LogicalPlanBuilder::from(&self.plan).filter(expr)?.build()?;
-        Ok(Arc::new(TableImpl::new(&plan)))
+        Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
 
     /// Perform an aggregate query
@@ -75,47 +79,41 @@ impl Table for TableImpl {
         &self,
         group_expr: Vec<Expr>,
         aggr_expr: Vec<Expr>,
-    ) -> Result<Arc<dyn Table>> {
+    ) -> Result<Arc<dyn DataFrame>> {
         let plan = LogicalPlanBuilder::from(&self.plan)
             .aggregate(group_expr, aggr_expr)?
             .build()?;
-        Ok(Arc::new(TableImpl::new(&plan)))
+        Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
 
     /// Limit the number of rows
-    fn limit(&self, n: usize) -> Result<Arc<dyn Table>> {
+    fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>> {
         let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?;
-        Ok(Arc::new(TableImpl::new(&plan)))
-    }
-
-    /// Return an expression representing a column within this table
-    fn col(&self, name: &str) -> Result<Expr> {
-        self.plan.schema().index_of(name)?; // check that the column exists
-        Ok(col(name))
+        Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
 
     /// Create an expression to represent the min() aggregate function
-    fn min(&self, expr: &Expr) -> Result<Expr> {
+    fn min(&self, expr: Expr) -> Result<Expr> {
         self.aggregate_expr("MIN", expr)
     }
 
     /// Create an expression to represent the max() aggregate function
-    fn max(&self, expr: &Expr) -> Result<Expr> {
+    fn max(&self, expr: Expr) -> Result<Expr> {
         self.aggregate_expr("MAX", expr)
     }
 
     /// Create an expression to represent the sum() aggregate function
-    fn sum(&self, expr: &Expr) -> Result<Expr> {
+    fn sum(&self, expr: Expr) -> Result<Expr> {
         self.aggregate_expr("SUM", expr)
     }
 
     /// Create an expression to represent the avg() aggregate function
-    fn avg(&self, expr: &Expr) -> Result<Expr> {
+    fn avg(&self, expr: Expr) -> Result<Expr> {
         self.aggregate_expr("AVG", expr)
     }
 
     /// Create an expression to represent the count() aggregate function
-    fn count(&self, expr: &Expr) -> Result<Expr> {
+    fn count(&self, expr: Expr) -> Result<Expr> {
         self.aggregate_expr("COUNT", expr)
     }
 
@@ -124,11 +122,8 @@ impl Table for TableImpl {
         self.plan.clone()
     }
 
-    fn collect(
-        &self,
-        ctx: &mut ExecutionContext,
-        batch_size: usize,
-    ) -> Result<Vec<RecordBatch>> {
+    fn collect(&self, batch_size: usize) -> Result<Vec<RecordBatch>> {
+        let mut ctx = ExecutionContext::from(self.ctx_state.clone());
         ctx.collect_plan(&self.plan.clone(), batch_size)
     }
 
@@ -138,7 +133,7 @@ impl Table for TableImpl {
     }
 }
 
-impl TableImpl {
+impl DataFrameImpl {
     /// Determine the data type for a given expression
     fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
         match expr {
@@ -156,8 +151,8 @@ impl TableImpl {
     }
 
     /// Create an expression to represent a named aggregate function
-    fn aggregate_expr(&self, name: &str, expr: &Expr) -> Result<Expr> {
-        let return_type = self.get_data_type(expr)?;
+    fn aggregate_expr(&self, name: &str, expr: Expr) -> Result<Expr> {
+        let return_type = self.get_data_type(&expr)?;
         Ok(Expr::AggregateFunction {
             name: name.to_string(),
             args: vec![expr.clone()],
@@ -193,7 +188,7 @@ mod tests {
     fn select_expr() -> Result<()> {
         // build plan using Table API
         let t = test_table()?;
-        let t2 = t.select(vec![t.col("c1")?, t.col("c2")?, t.col("c11")?])?;
+        let t2 = t.select(vec![col("c1"), col("c2"), col("c11")])?;
         let plan = t2.to_logical_plan();
 
         // build query using SQL
@@ -209,14 +204,13 @@ mod tests {
     fn aggregate() -> Result<()> {
         // build plan using Table API
         let t = test_table()?;
-        let group_expr = vec![t.col("c1")?];
-        let c12 = t.col("c12")?;
+        let group_expr = vec![col("c1")];
         let aggr_expr = vec![
-            t.min(&c12)?,
-            t.max(&c12)?,
-            t.avg(&c12)?,
-            t.sum(&c12)?,
-            t.count(&c12)?,
+            t.min(col("c12"))?,
+            t.max(col("c12"))?,
+            t.avg(col("c12"))?,
+            t.sum(col("c12"))?,
+            t.count(col("c12"))?,
         ];
 
         let t2 = t.aggregate(group_expr.clone(), aggr_expr.clone())?;
@@ -264,7 +258,7 @@ mod tests {
         ctx.create_logical_plan(sql)
     }
 
-    fn test_table() -> Result<Arc<dyn Table + 'static>> {
+    fn test_table() -> Result<Arc<dyn DataFrame + 'static>> {
         let mut ctx = ExecutionContext::new();
         register_aggregate_csv(&mut ctx)?;
         ctx.table("aggregate_test_100")
diff --git a/rust/datafusion/src/execution/mod.rs b/rust/datafusion/src/execution/mod.rs
index 5a0e752..21f3c3e 100644
--- a/rust/datafusion/src/execution/mod.rs
+++ b/rust/datafusion/src/execution/mod.rs
@@ -18,5 +18,5 @@
 //! DataFusion query execution
 
 pub mod context;
+pub mod dataframe_impl;
 pub mod physical_plan;
-pub mod table_impl;
diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs
index fb4e5af..73897ee 100644
--- a/rust/datafusion/src/lib.rs
+++ b/rust/datafusion/src/lib.rs
@@ -29,13 +29,15 @@
 extern crate arrow;
 extern crate sqlparser;
 
+pub mod dataframe;
 pub mod datasource;
 pub mod error;
 pub mod execution;
 pub mod logicalplan;
 pub mod optimizer;
 pub mod sql;
-pub mod table;
+
+pub use execution::context::ExecutionContext;
 
 #[cfg(test)]
 pub mod test;
diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs
index 55c9b14..5831eb9 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -436,57 +436,33 @@ impl Expr {
     }
 
     /// Equal
-    pub fn eq(&self, other: &Expr) -> Expr {
-        Expr::BinaryExpr {
-            left: Box::new(self.clone()),
-            op: Operator::Eq,
-            right: Box::new(other.clone()),
-        }
+    pub fn eq(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Eq, other.clone())
     }
 
     /// Not equal
-    pub fn not_eq(&self, other: &Expr) -> Expr {
-        Expr::BinaryExpr {
-            left: Box::new(self.clone()),
-            op: Operator::NotEq,
-            right: Box::new(other.clone()),
-        }
+    pub fn not_eq(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::NotEq, other.clone())
     }
 
     /// Greater than
-    pub fn gt(&self, other: &Expr) -> Expr {
-        Expr::BinaryExpr {
-            left: Box::new(self.clone()),
-            op: Operator::Gt,
-            right: Box::new(other.clone()),
-        }
+    pub fn gt(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Gt, other.clone())
     }
 
     /// Greater than or equal to
-    pub fn gt_eq(&self, other: &Expr) -> Expr {
-        Expr::BinaryExpr {
-            left: Box::new(self.clone()),
-            op: Operator::GtEq,
-            right: Box::new(other.clone()),
-        }
+    pub fn gt_eq(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::GtEq, other.clone())
     }
 
     /// Less than
-    pub fn lt(&self, other: &Expr) -> Expr {
-        Expr::BinaryExpr {
-            left: Box::new(self.clone()),
-            op: Operator::Lt,
-            right: Box::new(other.clone()),
-        }
+    pub fn lt(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Lt, other.clone())
     }
 
     /// Less than or equal to
-    pub fn lt_eq(&self, other: &Expr) -> Expr {
-        Expr::BinaryExpr {
-            left: Box::new(self.clone()),
-            op: Operator::LtEq,
-            right: Box::new(other.clone()),
-        }
+    pub fn lt_eq(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::LtEq, other.clone())
     }
 
     /// Not
@@ -494,12 +470,45 @@ impl Expr {
         Expr::Not(Box::new(self.clone()))
     }
 
+    /// Add the specified expression
+    pub fn plus(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Plus, other.clone())
+    }
+
+    /// Subtract the specified expression
+    pub fn minus(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Minus, other.clone())
+    }
+
+    /// Multiply by the specified expression
+    pub fn multiply(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Multiply, other.clone())
+    }
+
+    /// Divide by the specified expression
+    pub fn divide(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Divide, other.clone())
+    }
+
+    /// Calculate the modulus of two expressions
+    pub fn modulus(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Modulus, other.clone())
+    }
+
     /// Alias
     pub fn alias(&self, name: &str) -> Expr {
         Expr::Alias(Box::new(self.clone()), name.to_owned())
     }
 }
 
+fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
+    Expr::BinaryExpr {
+        left: Box::new(l),
+        op,
+        right: Box::new(r),
+    }
+}
+
 /// Create a column expression based on a column name
 pub fn col(name: &str) -> Expr {
     Expr::Column(name.to_owned())
@@ -1222,7 +1231,7 @@ mod tests {
             &employee_schema(),
             Some(vec![0, 3]),
         )?
-        .filter(col("state").eq(&lit("CO")))?
+        .filter(col("state").eq(lit("CO")))?
         .project(vec![col("id")])?
         .build()?;
 
@@ -1242,7 +1251,7 @@ mod tests {
             CsvReadOptions::new().schema(&employee_schema()),
             Some(vec![0, 3]),
         )?
-        .filter(col("state").eq(&lit("CO")))?
+        .filter(col("state").eq(lit("CO")))?
         .project(vec![col("id")])?
         .build()?;
 
diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs
index a3e1ab9..a0926db 100644
--- a/rust/datafusion/src/optimizer/type_coercion.rs
+++ b/rust/datafusion/src/optimizer/type_coercion.rs
@@ -21,6 +21,7 @@
 //! float)`. This keeps the runtime query execution code much simpler.
 
 use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
 
 use arrow::datatypes::Schema;
 
@@ -33,14 +34,16 @@ use crate::optimizer::utils;
 use utils::optimize_explain;
 
 /// Implementation of type coercion optimizer rule
-pub struct TypeCoercionRule<'a> {
-    scalar_functions: &'a HashMap<String, Box<ScalarFunction>>,
+pub struct TypeCoercionRule {
+    scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
 }
 
-impl<'a> TypeCoercionRule<'a> {
+impl TypeCoercionRule {
     /// Create a new type coercion optimizer rule using meta-data about registered
     /// scalar functions
-    pub fn new(scalar_functions: &'a HashMap<String, Box<ScalarFunction>>) -> Self {
+    pub fn new(
+        scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+    ) -> Self {
         Self { scalar_functions }
     }
 
@@ -85,7 +88,12 @@ impl<'a> TypeCoercionRule<'a> {
                 return_type,
             } => {
                 // cast the inputs of scalar functions to the appropriate type where possible
-                match self.scalar_functions.get(name) {
+                match self
+                    .scalar_functions
+                    .lock()
+                    .expect("failed to lock mutex")
+                    .get(name)
+                {
                     Some(func_meta) => {
                         let mut func_args = Vec::with_capacity(args.len());
                         for i in 0..args.len() {
@@ -143,7 +151,7 @@ impl<'a> TypeCoercionRule<'a> {
     }
 }
 
-impl<'a> OptimizerRule for TypeCoercionRule<'a> {
+impl OptimizerRule for TypeCoercionRule {
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Projection { expr, input, .. } => {
@@ -215,7 +223,7 @@ mod tests {
         let options = CsvReadOptions::new().schema_infer_max_records(100);
         let plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
             // filter clause needs the type coercion rule applied
-            .filter(col("c7").lt(&lit(5_u8)))?
+            .filter(col("c7").lt(lit(5_u8)))?
             .project(vec![col("c1"), col("c2")])?
             .aggregate(
                 vec![col("c1")],
@@ -226,7 +234,7 @@ mod tests {
             .build()?;
 
         let scalar_functions = HashMap::new();
-        let mut rule = TypeCoercionRule::new(&scalar_functions);
+        let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions)));
         let plan = rule.optimize(&plan)?;
 
         // check that the filter had a cast added
@@ -249,11 +257,11 @@ mod tests {
 
         let options = CsvReadOptions::new().schema_infer_max_records(100);
         let plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
-            .filter(col("c7").lt(&col("c12")))?
+            .filter(col("c7").lt(col("c12")))?
             .build()?;
 
         let scalar_functions = HashMap::new();
-        let mut rule = TypeCoercionRule::new(&scalar_functions);
+        let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions)));
         let plan = rule.optimize(&plan)?;
 
         assert!(
diff --git a/rust/datafusion/src/table.rs b/rust/datafusion/src/table.rs
deleted file mode 100644
index bd04f07..0000000
--- a/rust/datafusion/src/table.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Table API for building a logical query plan. This is similar to the Table API in Ibis
-//! and the DataFrame API in Apache Spark
-
-use crate::arrow::record_batch::RecordBatch;
-use crate::error::Result;
-use crate::execution::context::ExecutionContext;
-use crate::logicalplan::{Expr, LogicalPlan};
-use arrow::datatypes::Schema;
-use std::sync::Arc;
-
-/// Table is an abstraction of a logical query plan
-pub trait Table {
-    /// Select columns by name
-    fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<dyn Table>>;
-
-    /// Create a projection based on arbitrary expressions
-    fn select(&self, expr: Vec<Expr>) -> Result<Arc<dyn Table>>;
-
-    /// Create a selection based on a filter expression
-    fn filter(&self, expr: Expr) -> Result<Arc<dyn Table>>;
-
-    /// Perform an aggregate query
-    fn aggregate(
-        &self,
-        group_expr: Vec<Expr>,
-        aggr_expr: Vec<Expr>,
-    ) -> Result<Arc<dyn Table>>;
-
-    /// limit the number of rows
-    fn limit(&self, n: usize) -> Result<Arc<dyn Table>>;
-
-    /// Return the logical plan
-    fn to_logical_plan(&self) -> LogicalPlan;
-
-    /// Return an expression representing a column within this table
-    fn col(&self, name: &str) -> Result<Expr>;
-
-    /// Create an expression to represent the min() aggregate function
-    fn min(&self, expr: &Expr) -> Result<Expr>;
-
-    /// Create an expression to represent the max() aggregate function
-    fn max(&self, expr: &Expr) -> Result<Expr>;
-
-    /// Create an expression to represent the sum() aggregate function
-    fn sum(&self, expr: &Expr) -> Result<Expr>;
-
-    /// Create an expression to represent the avg() aggregate function
-    fn avg(&self, expr: &Expr) -> Result<Expr>;
-
-    /// Create an expression to represent the count() aggregate function
-    fn count(&self, expr: &Expr) -> Result<Expr>;
-
-    /// Collects the result as a vector of RecordBatch.
-    fn collect(
-        &self,
-        ctx: &mut ExecutionContext,
-        batch_size: usize,
-    ) -> Result<Vec<RecordBatch>>;
-
-    /// Returns the schema
-    fn schema(&self) -> &Schema;
-}
diff --git a/rust/datafusion/tests/example.csv b/rust/datafusion/tests/example.csv
new file mode 100644
index 0000000..0eadb69
--- /dev/null
+++ b/rust/datafusion/tests/example.csv
@@ -0,0 +1,2 @@
+a,b,c
+1,2,3
\ No newline at end of file