You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2019/04/15 11:23:33 UTC

[arrow] branch master updated: ARROW-4467: [Rust] [DataFusion] Create a REPL & Dockerfile for DataFusion

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new cc5d687  ARROW-4467: [Rust] [DataFusion] Create a REPL & Dockerfile for DataFusion
cc5d687 is described below

commit cc5d6876206e457e562820e054864c2ec6464064
Author: Zhiyuan Zheng <zh...@yandex.com>
AuthorDate: Mon Apr 15 13:23:11 2019 +0200

    ARROW-4467: [Rust] [DataFusion] Create a REPL & Dockerfile for DataFusion
    
    This pr contains a REPL implementation of DataFusion and create a Dockerfile for it.
    
    Which achieves the following workflow w/o coding:
    [https://gist.github.com/zhzy0077/4fd32795691ae7725a323d9a61e55c9d](https://gist.github.com/zhzy0077/4fd32795691ae7725a323d9a61e55c9d)
    
    Known Issue:
    1. Don't support `unsigned` data types since `sqlparser-rs` haven't supported yet.
    2. I don't know how to push docker images to `apache/arrow-datafusion`. Help wanted here @andygrove
    
    Author: Zhiyuan Zheng <zh...@yandex.com>
    
    Closes #4147 from zhzy0077/master and squashes the following commits:
    
    deb83a75f <Zhiyuan Zheng> add dockerfile & update readme.md
    1ca46d7d0 <Zhiyuan Zheng> Remove debug infos & add tests for create external table.
    babecc6cd <Zhiyuan Zheng> add repl for DataFusion
---
 rust/.gitignore                                    |   2 +
 rust/datafusion/Cargo.toml                         |   8 +-
 rust/datafusion/Dockerfile                         |  24 +++
 rust/datafusion/README.md                          |  31 ++++
 rust/datafusion/src/bin/repl.rs                    | 203 +++++++++++++++++++++
 rust/datafusion/src/execution/context.rs           | 103 ++++++++++-
 rust/datafusion/src/execution/mod.rs               |   1 +
 rust/datafusion/src/execution/scalar_relation.rs   |  65 +++++++
 rust/datafusion/src/logicalplan.rs                 |  18 ++
 .../src/optimizer/projection_push_down.rs          |  13 ++
 rust/datafusion/src/sql/parser.rs                  |  24 ++-
 rust/datafusion/tests/sql.rs                       |  53 ++++++
 12 files changed, 524 insertions(+), 21 deletions(-)

diff --git a/rust/.gitignore b/rust/.gitignore
index fa8d85a..6580b8e 100644
--- a/rust/.gitignore
+++ b/rust/.gitignore
@@ -1,2 +1,4 @@
 Cargo.lock
 target
+
+.history
\ No newline at end of file
diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml
index 8dcb657..202a98b 100644
--- a/rust/datafusion/Cargo.toml
+++ b/rust/datafusion/Cargo.toml
@@ -34,15 +34,21 @@ edition = "2018"
 name = "datafusion"
 path = "src/lib.rs"
 
+[[bin]]
+name = "datafusion-cli"
+path = "src/bin/repl.rs"
+
 [dependencies]
 fnv = "1.0.3"
 arrow = { path = "../arrow" }
 parquet = { path = "../parquet" }
-datafusion-rustyline = "2.0.0-alpha-20180628"
 serde = { version = "1.0.80", features = ["rc"] }
 serde_derive = "1.0.80"
 serde_json = "1.0.33"
 sqlparser = "0.2.0"
+clap = "2.33.0"
+rustyline = "3.0.0"
+prettytable-rs = "0.8.0"
 
 [dev-dependencies]
 criterion = "0.2.0"
diff --git a/rust/datafusion/Dockerfile b/rust/datafusion/Dockerfile
new file mode 100644
index 0000000..8b16313
--- /dev/null
+++ b/rust/datafusion/Dockerfile
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+FROM rustlang/rust:nightly
+
+COPY rust /arrow/rust/
+WORKDIR /arrow/rust/datafusion
+RUN cargo install --bin datafusion-cli --path .
+
+CMD ["datafusion-cli", "--data-path", "/data"]
diff --git a/rust/datafusion/README.md b/rust/datafusion/README.md
index 925a0d2..654f5ba 100644
--- a/rust/datafusion/README.md
+++ b/rust/datafusion/README.md
@@ -23,6 +23,8 @@ DataFusion is an in-memory query engine that uses Apache Arrow as the memory mod
 
 ## Usage
 
+
+#### Use as a lib
 Add this to your Cargo.toml:
 
 ```toml
@@ -30,6 +32,35 @@ Add this to your Cargo.toml:
 datafusion = "0.14.0-SNAPSHOT"
 ```
 
+#### Use as a bin
+##### Build your own bin(requires rust toolchains)
+```sh
+git clone https://github/apache/arrow
+cd arrow/rust/datafusion
+cargo run --bin datafusion-cli
+```
+##### Use Dockerfile
+```sh
+git clone https://github/apache/arrow
+cd arrow
+docker build -f rust/datafusion/Dockerfile . --tag datafusion-cli
+docker run -it -v $(your_data_location):/data datafusion-cli
+```
+
+```
+USAGE:
+    datafusion-cli [OPTIONS]
+
+FLAGS:
+    -h, --help       Prints help information
+    -V, --version    Prints version information
+
+OPTIONS:
+    -c, --batch-size <batch-size>    The batch size of each query, default value is 1048576
+    -p, --data-path <data-path>      Path to your data, default to current directory
+```
+
+
 # Status
 
 ## General
diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs
new file mode 100644
index 0000000..ac73b26
--- /dev/null
+++ b/rust/datafusion/src/bin/repl.rs
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate clap;
+
+use arrow::array::*;
+use arrow::datatypes::{DataType, TimeUnit};
+use clap::{App, Arg};
+use datafusion::error::{ExecutionError, Result};
+use datafusion::execution::context::ExecutionContext;
+use datafusion::execution::relation::Relation;
+use prettytable::{Cell, Row, Table};
+use rustyline::Editor;
+use std::cell::RefMut;
+use std::env;
+use std::path::Path;
+
+fn main() {
+    let matches = App::new("DataFusion")
+        .version(crate_version!())
+        .about(
+            "DataFusion is an in-memory query engine that uses Apache Arrow \
+             as the memory model. It supports executing SQL queries against CSV and \
+             Parquet files as well as querying directly against in-memory data.",
+        )
+        .arg(
+            Arg::with_name("data-path")
+                .help("Path to your data, default to current directory")
+                .short("p")
+                .long("data-path")
+                .takes_value(true),
+        )
+        .arg(
+            Arg::with_name("batch-size")
+                .help("The batch size of each query, default value is 1048576")
+                .short("c")
+                .long("batch-size")
+                .takes_value(true),
+        )
+        .get_matches();
+
+    if let Some(path) = matches.value_of("data-path") {
+        let p = Path::new(path);
+        env::set_current_dir(&p).unwrap();
+    };
+
+    let batch_size = matches
+        .value_of("batch-size")
+        .map(|size| size.parse::<usize>().unwrap())
+        .unwrap_or(1_048_576);
+
+    let mut ctx = ExecutionContext::new();
+
+    let mut rl = Editor::<()>::new();
+    rl.load_history(".history").ok();
+
+    let mut query = "".to_owned();
+    loop {
+        let readline = rl.readline("> ");
+        match readline {
+            Ok(ref line) if line.trim_end().ends_with(';') => {
+                query.push_str(line.trim_end());
+                rl.add_history_entry(query.clone());
+                match exec_and_print(&mut ctx, query, batch_size) {
+                    Ok(_) => {}
+                    Err(err) => println!("{:?}", err),
+                }
+                query = "".to_owned();
+            }
+            Ok(ref line) => {
+                query.push_str(line);
+                query.push_str(" ");
+            }
+            Err(_) => {
+                break;
+            }
+        }
+    }
+
+    rl.save_history(".history").ok();
+}
+
+fn exec_and_print(
+    ctx: &mut ExecutionContext,
+    sql: String,
+    batch_size: usize,
+) -> Result<()> {
+    let relation = ctx.sql(&sql, batch_size)?;
+    print_result(relation.borrow_mut())?;
+
+    Ok(())
+}
+
+fn print_result(mut results: RefMut<Relation>) -> Result<()> {
+    let mut row_count = 0;
+    let mut table = Table::new();
+    let schema = results.schema();
+
+    let mut header = Vec::new();
+    for field in schema.fields() {
+        header.push(Cell::new(&field.name()));
+    }
+    table.add_row(Row::new(header));
+
+    while let Some(batch) = results.next().unwrap() {
+        row_count += batch.num_rows();
+
+        for row in 0..batch.num_rows() {
+            let mut cells = Vec::new();
+            for col in 0..batch.num_columns() {
+                let column = batch.column(col);
+                cells.push(Cell::new(&str_value(column.clone(), row)?));
+            }
+            table.add_row(Row::new(cells));
+        }
+    }
+    table.printstd();
+
+    if row_count > 1 {
+        println!("{} rows in set.", row_count);
+    } else {
+        println!("{} row in set.", row_count);
+    }
+
+    Ok(())
+}
+
+macro_rules! make_string {
+    ($array_type:ty, $column: ident, $row: ident) => {{
+        Ok($column
+            .as_any()
+            .downcast_ref::<$array_type>()
+            .unwrap()
+            .value($row)
+            .to_string())
+    }};
+}
+
+fn str_value(column: ArrayRef, row: usize) -> Result<String> {
+    match column.data_type() {
+        DataType::Utf8 => Ok(column
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap()
+            .get_string(row)),
+        DataType::Boolean => make_string!(BooleanArray, column, row),
+        DataType::Int16 => make_string!(Int16Array, column, row),
+        DataType::Int32 => make_string!(Int32Array, column, row),
+        DataType::Int64 => make_string!(Int64Array, column, row),
+        DataType::UInt8 => make_string!(UInt8Array, column, row),
+        DataType::UInt16 => make_string!(UInt16Array, column, row),
+        DataType::UInt32 => make_string!(UInt32Array, column, row),
+        DataType::UInt64 => make_string!(UInt64Array, column, row),
+        DataType::Float16 => make_string!(Float32Array, column, row),
+        DataType::Float32 => make_string!(Float32Array, column, row),
+        DataType::Float64 => make_string!(Float64Array, column, row),
+        DataType::Timestamp(unit) if *unit == TimeUnit::Second => {
+            make_string!(TimestampSecondArray, column, row)
+        }
+        DataType::Timestamp(unit) if *unit == TimeUnit::Millisecond => {
+            make_string!(TimestampMillisecondArray, column, row)
+        }
+        DataType::Timestamp(unit) if *unit == TimeUnit::Microsecond => {
+            make_string!(TimestampMicrosecondArray, column, row)
+        }
+        DataType::Timestamp(unit) if *unit == TimeUnit::Nanosecond => {
+            make_string!(TimestampNanosecondArray, column, row)
+        }
+        DataType::Date32(_) => make_string!(Date32Array, column, row),
+        DataType::Date64(_) => make_string!(Date64Array, column, row),
+        DataType::Time32(unit) if *unit == TimeUnit::Second => {
+            make_string!(Time32SecondArray, column, row)
+        }
+        DataType::Time32(unit) if *unit == TimeUnit::Millisecond => {
+            make_string!(Time32MillisecondArray, column, row)
+        }
+        DataType::Time32(unit) if *unit == TimeUnit::Microsecond => {
+            make_string!(Time64MicrosecondArray, column, row)
+        }
+        DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
+            make_string!(Time64NanosecondArray, column, row)
+        }
+        _ => Err(ExecutionError::ExecutionError(format!(
+            "Unsupported {:?} type for repl.",
+            column.data_type()
+        ))),
+    }
+}
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 1618e6a..1bebdb5 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -25,6 +25,8 @@ use std::sync::Arc;
 
 use arrow::datatypes::*;
 
+use crate::arrow::array::ArrayRef;
+use crate::arrow::builder::BooleanBuilder;
 use crate::datasource::csv::CsvFile;
 use crate::datasource::TableProvider;
 use crate::error::{ExecutionError, Result};
@@ -34,15 +36,18 @@ use crate::execution::filter::FilterRelation;
 use crate::execution::limit::LimitRelation;
 use crate::execution::projection::ProjectRelation;
 use crate::execution::relation::{DataSourceRelation, Relation};
+use crate::execution::scalar_relation::ScalarRelation;
 use crate::execution::table_impl::TableImpl;
 use crate::logicalplan::*;
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::projection_push_down::ProjectionPushDown;
 use crate::optimizer::type_coercion::TypeCoercionRule;
 use crate::optimizer::utils;
+use crate::sql::parser::FileType;
 use crate::sql::parser::{DFASTNode, DFParser};
 use crate::sql::planner::{SchemaProvider, SqlToRel};
 use crate::table::Table;
+use sqlparser::sqlast::{SQLColumnDef, SQLType};
 
 /// Execution context for registering data sources and executing queries
 pub struct ExecutionContext {
@@ -50,7 +55,7 @@ pub struct ExecutionContext {
 }
 
 impl ExecutionContext {
-    /// Create a new excution context for in-memory queries
+    /// Create a new execution context for in-memory queries
     pub fn new() -> Self {
         Self {
             datasources: Rc::new(RefCell::new(HashMap::new())),
@@ -83,9 +88,61 @@ impl ExecutionContext {
 
                 Ok(self.optimize(&plan)?)
             }
-            other => Err(ExecutionError::General(format!(
-                "Cannot create logical plan from {:?}",
-                other
+            DFASTNode::CreateExternalTable {
+                name,
+                columns,
+                file_type,
+                header_row,
+                location,
+            } => {
+                let schema = Arc::new(self.build_schema(columns)?);
+
+                Ok(Arc::new(LogicalPlan::CreateExternalTable {
+                    schema,
+                    name,
+                    location,
+                    file_type,
+                    header_row,
+                }))
+            }
+        }
+    }
+
+    fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
+        let mut fields = Vec::new();
+
+        for column in columns {
+            let data_type = self.make_data_type(column.data_type)?;
+            fields.push(Field::new(&column.name, data_type, column.allow_null));
+        }
+
+        Ok(Schema::new(fields))
+    }
+
+    fn make_data_type(&self, sql_type: SQLType) -> Result<DataType> {
+        match sql_type {
+            SQLType::BigInt => Ok(DataType::Int64),
+            SQLType::Int => Ok(DataType::Int32),
+            SQLType::SmallInt => Ok(DataType::Int16),
+            SQLType::Char(_) | SQLType::Varchar(_) | SQLType::Text => Ok(DataType::Utf8),
+            SQLType::Decimal(_, _) => Ok(DataType::Float64),
+            SQLType::Float(_) => Ok(DataType::Float32),
+            SQLType::Real | SQLType::Double => Ok(DataType::Float64),
+            SQLType::Boolean => Ok(DataType::Boolean),
+            SQLType::Date => Ok(DataType::Date64(DateUnit::Day)),
+            SQLType::Time => Ok(DataType::Time64(TimeUnit::Millisecond)),
+            SQLType::Timestamp => Ok(DataType::Date64(DateUnit::Millisecond)),
+            SQLType::Uuid
+            | SQLType::Clob(_)
+            | SQLType::Binary(_)
+            | SQLType::Varbinary(_)
+            | SQLType::Blob(_)
+            | SQLType::Regclass
+            | SQLType::Bytea
+            | SQLType::Custom(_)
+            | SQLType::Array(_) => Err(ExecutionError::General(format!(
+                "Unsupported data type: {:?}.",
+                sql_type
             ))),
         }
     }
@@ -110,7 +167,7 @@ impl ExecutionContext {
 
     /// Get a table by name
     pub fn table(&mut self, table_name: &str) -> Result<Arc<Table>> {
-        match self.datasources.borrow().get(table_name) {
+        match (*self.datasources).borrow().get(table_name) {
             Some(provider) => {
                 Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::TableScan {
                     schema_name: "".to_string(),
@@ -151,7 +208,7 @@ impl ExecutionContext {
                 ref table_name,
                 ref projection,
                 ..
-            } => match self.datasources.borrow().get(table_name) {
+            } => match (*self.datasources).borrow().get(table_name) {
                 Some(provider) => {
                     let ds = provider.scan(projection, batch_size)?;
                     if ds.len() == 1 {
@@ -277,6 +334,38 @@ impl ExecutionContext {
                 }
             }
 
+            LogicalPlan::CreateExternalTable {
+                ref schema,
+                ref name,
+                ref location,
+                ref file_type,
+                ref header_row,
+            } => {
+                match file_type {
+                    FileType::CSV => {
+                        self.register_csv(name, location, schema, *header_row)
+                    }
+                    _ => {
+                        return Err(ExecutionError::ExecutionError(format!(
+                            "Unsupported file type {:?}.",
+                            file_type
+                        )));
+                    }
+                }
+                let mut builder = BooleanBuilder::new(1);
+                builder.append_value(true)?;
+
+                let columns = vec![Arc::new(builder.finish()) as ArrayRef];
+                Ok(Rc::new(RefCell::new(ScalarRelation::new(
+                    Arc::new(Schema::new(vec![Field::new(
+                        "result",
+                        DataType::Boolean,
+                        false,
+                    )])),
+                    columns,
+                ))))
+            }
+
             _ => Err(ExecutionError::NotImplemented(
                 "Unsupported logical plan for execution".to_string(),
             )),
@@ -289,7 +378,7 @@ struct ExecutionContextSchemaProvider {
 }
 impl SchemaProvider for ExecutionContextSchemaProvider {
     fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>> {
-        match self.datasources.borrow().get(name) {
+        match (*self.datasources).borrow().get(name) {
             Some(ds) => Some(ds.schema().clone()),
             None => None,
         }
diff --git a/rust/datafusion/src/execution/mod.rs b/rust/datafusion/src/execution/mod.rs
index cfd748a..d4f57a7 100644
--- a/rust/datafusion/src/execution/mod.rs
+++ b/rust/datafusion/src/execution/mod.rs
@@ -24,4 +24,5 @@ pub mod filter;
 pub mod limit;
 pub mod projection;
 pub mod relation;
+pub mod scalar_relation;
 pub mod table_impl;
diff --git a/rust/datafusion/src/execution/scalar_relation.rs b/rust/datafusion/src/execution/scalar_relation.rs
new file mode 100644
index 0000000..96e3118
--- /dev/null
+++ b/rust/datafusion/src/execution/scalar_relation.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Scalar relation, emit one fixed scalar value.
+
+use crate::error::Result;
+use crate::execution::relation::Relation;
+use arrow::array::ArrayRef;
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+use std::sync::Arc;
+
+/// A relation emit single scalar array;
+pub(super) struct ScalarRelation {
+    /// The schema for the limit relation, which is always the same as the schema of the input relation
+    schema: Arc<Schema>,
+
+    value: Vec<ArrayRef>,
+
+    /// The number of rows that have been returned so far
+    emitted: bool,
+}
+
+impl ScalarRelation {
+    pub fn new(schema: Arc<Schema>, value: Vec<ArrayRef>) -> Self {
+        Self {
+            schema,
+            value,
+            emitted: false,
+        }
+    }
+}
+
+impl Relation for ScalarRelation {
+    fn next(&mut self) -> Result<Option<RecordBatch>> {
+        if self.emitted {
+            return Ok(None);
+        }
+
+        self.emitted = true;
+
+        Ok(Some(RecordBatch::try_new(
+            self.schema().clone(),
+            self.value.clone(),
+        )?))
+    }
+
+    fn schema(&self) -> &Arc<Schema> {
+        &self.schema
+    }
+}
diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs
index c2ac37b..8e69056 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -24,6 +24,7 @@ use arrow::datatypes::{DataType, Field, Schema};
 
 use crate::error::{ExecutionError, Result};
 use crate::optimizer::utils;
+use crate::sql::parser::FileType;
 
 /// Enumeration of supported function types (Scalar and Aggregate)
 #[derive(Serialize, Deserialize, Debug, Clone)]
@@ -441,6 +442,19 @@ pub enum LogicalPlan {
         /// The schema description
         schema: Arc<Schema>,
     },
+    /// Represents a create external table expression.
+    CreateExternalTable {
+        /// The table schema
+        schema: Arc<Schema>,
+        /// The table name
+        name: String,
+        /// The physical location
+        location: String,
+        /// The file type of physical file
+        file_type: FileType,
+        /// Whether the CSV file contains a header
+        header_row: bool,
+    },
 }
 
 impl LogicalPlan {
@@ -454,6 +468,7 @@ impl LogicalPlan {
             LogicalPlan::Aggregate { schema, .. } => &schema,
             LogicalPlan::Sort { schema, .. } => &schema,
             LogicalPlan::Limit { schema, .. } => &schema,
+            LogicalPlan::CreateExternalTable { schema, .. } => &schema,
         }
     }
 }
@@ -530,6 +545,9 @@ impl LogicalPlan {
                 write!(f, "Limit: {:?}", expr)?;
                 input.fmt_with_indent(f, indent + 1)
             }
+            LogicalPlan::CreateExternalTable { ref name, .. } => {
+                write!(f, "CreateExternalTable: {:?}", name)
+            }
         }
     }
 }
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index a75982f..52ac440 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -191,6 +191,19 @@ impl ProjectionPushDown {
                 input: input.clone(),
                 schema: schema.clone(),
             })),
+            LogicalPlan::CreateExternalTable {
+                schema,
+                name,
+                location,
+                file_type,
+                header_row,
+            } => Ok(Arc::new(LogicalPlan::CreateExternalTable {
+                schema: schema.clone(),
+                name: name.to_string(),
+                location: location.to_string(),
+                file_type: file_type.clone(),
+                header_row: *header_row,
+            })),
         }
     }
 
diff --git a/rust/datafusion/src/sql/parser.rs b/rust/datafusion/src/sql/parser.rs
index dc46ce0..74ae4cf 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -32,7 +32,7 @@ macro_rules! parser_err {
 }
 
 /// Types of files to parse as DataFrames
-#[derive(Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
 pub enum FileType {
     /// Newline-delimited JSON
     NdJson,
@@ -134,18 +134,16 @@ impl DFParser {
                                         true
                                     };
 
-                                    match self.parser.peek_token() {
-                                        Some(Token::Comma) => {
-                                            self.parser.next_token();
-                                            columns.push(SQLColumnDef {
-                                                name: column_name,
-                                                data_type: data_type,
-                                                allow_null,
-                                                default: None,
-                                                is_primary: false,
-                                                is_unique: false,
-                                            });
-                                        }
+                                    columns.push(SQLColumnDef {
+                                        name: column_name,
+                                        data_type: data_type,
+                                        allow_null,
+                                        default: None,
+                                        is_primary: false,
+                                        is_unique: false,
+                                    });
+                                    match self.parser.next_token() {
+                                        Some(Token::Comma) => continue,
                                         Some(Token::RParen) => break,
                                         _ => {
                                             return parser_err!(
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 81c52c9..566fecf 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -241,6 +241,26 @@ fn csv_query_limit_zero() {
     assert_eq!(expected, actual);
 }
 
+#[test]
+fn csv_query_create_external_table() {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv_by_sql(&mut ctx);
+    let sql = "SELECT c1, c2, c3, c4, c5, c6, c7, c8, c9, 10, c11, c12, c13 FROM aggregate_test_100 LIMIT 1";
+    let actual = execute(&mut ctx, sql);
+    let expected = "\"c\"\t2\t1\t18109\t2033001162\t-6513304855495910254\t25\t43062\t1491205016\t10\t0.110830784\t0.9294097332465232\t\"6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW\"\n".to_string();
+    assert_eq!(expected, actual);
+}
+
+#[test]
+fn csv_query_external_table_count() {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv_by_sql(&mut ctx);
+    let sql = "SELECT COUNT(c12) FROM aggregate_test_100";
+    let actual = execute(&mut ctx, sql);
+    let expected = "100\n".to_string();
+    assert_eq!(expected, actual);
+}
+
 //TODO Uncomment the following test when ORDER BY is implemented to be able to test ORDER
 // BY + LIMIT
 /*
@@ -273,6 +293,39 @@ fn aggr_test_schema() -> Arc<Schema> {
     ]))
 }
 
+fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
+    let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+
+    // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once unsigned is supported.
+    ctx.sql(
+        &format!(
+            "
+    CREATE EXTERNAL TABLE aggregate_test_100 (
+        c1  VARCHAR NOT NULL,
+        c2  INT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT NOT NULL,
+        c5  INT NOT NULL,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  BIGINT NOT NULL,
+        c10 VARCHAR NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+    STORED AS CSV
+    WITH HEADER ROW
+    LOCATION '{}/csv/aggregate_test_100.csv'
+    ",
+            testdata
+        ),
+        1024,
+    )
+    .unwrap();
+}
+
 fn register_aggregate_csv(ctx: &mut ExecutionContext) {
     let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
     let schema = aggr_test_schema();