You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2019/04/15 11:23:33 UTC
[arrow] branch master updated: ARROW-4467: [Rust] [DataFusion]
Create a REPL & Dockerfile for DataFusion
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new cc5d687 ARROW-4467: [Rust] [DataFusion] Create a REPL & Dockerfile for DataFusion
cc5d687 is described below
commit cc5d6876206e457e562820e054864c2ec6464064
Author: Zhiyuan Zheng <zh...@yandex.com>
AuthorDate: Mon Apr 15 13:23:11 2019 +0200
ARROW-4467: [Rust] [DataFusion] Create a REPL & Dockerfile for DataFusion
This pr contains a REPL implementation of DataFusion and create a Dockerfile for it.
Which achieves the following workflow w/o coding:
[https://gist.github.com/zhzy0077/4fd32795691ae7725a323d9a61e55c9d](https://gist.github.com/zhzy0077/4fd32795691ae7725a323d9a61e55c9d)
Known Issue:
1. Don't support `unsigned` data types since `sqlparser-rs` haven't supported yet.
2. I don't know how to push docker images to `apache/arrow-datafusion`. Help wanted here @andygrove
Author: Zhiyuan Zheng <zh...@yandex.com>
Closes #4147 from zhzy0077/master and squashes the following commits:
deb83a75f <Zhiyuan Zheng> add dockerfile & update readme.md
1ca46d7d0 <Zhiyuan Zheng> Remove debug infos & add tests for create external table.
babecc6cd <Zhiyuan Zheng> add repl for DataFusion
---
rust/.gitignore | 2 +
rust/datafusion/Cargo.toml | 8 +-
rust/datafusion/Dockerfile | 24 +++
rust/datafusion/README.md | 31 ++++
rust/datafusion/src/bin/repl.rs | 203 +++++++++++++++++++++
rust/datafusion/src/execution/context.rs | 103 ++++++++++-
rust/datafusion/src/execution/mod.rs | 1 +
rust/datafusion/src/execution/scalar_relation.rs | 65 +++++++
rust/datafusion/src/logicalplan.rs | 18 ++
.../src/optimizer/projection_push_down.rs | 13 ++
rust/datafusion/src/sql/parser.rs | 24 ++-
rust/datafusion/tests/sql.rs | 53 ++++++
12 files changed, 524 insertions(+), 21 deletions(-)
diff --git a/rust/.gitignore b/rust/.gitignore
index fa8d85a..6580b8e 100644
--- a/rust/.gitignore
+++ b/rust/.gitignore
@@ -1,2 +1,4 @@
Cargo.lock
target
+
+.history
\ No newline at end of file
diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml
index 8dcb657..202a98b 100644
--- a/rust/datafusion/Cargo.toml
+++ b/rust/datafusion/Cargo.toml
@@ -34,15 +34,21 @@ edition = "2018"
name = "datafusion"
path = "src/lib.rs"
+[[bin]]
+name = "datafusion-cli"
+path = "src/bin/repl.rs"
+
[dependencies]
fnv = "1.0.3"
arrow = { path = "../arrow" }
parquet = { path = "../parquet" }
-datafusion-rustyline = "2.0.0-alpha-20180628"
serde = { version = "1.0.80", features = ["rc"] }
serde_derive = "1.0.80"
serde_json = "1.0.33"
sqlparser = "0.2.0"
+clap = "2.33.0"
+rustyline = "3.0.0"
+prettytable-rs = "0.8.0"
[dev-dependencies]
criterion = "0.2.0"
diff --git a/rust/datafusion/Dockerfile b/rust/datafusion/Dockerfile
new file mode 100644
index 0000000..8b16313
--- /dev/null
+++ b/rust/datafusion/Dockerfile
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+FROM rustlang/rust:nightly
+
+COPY rust /arrow/rust/
+WORKDIR /arrow/rust/datafusion
+RUN cargo install --bin datafusion-cli --path .
+
+CMD ["datafusion-cli", "--data-path", "/data"]
diff --git a/rust/datafusion/README.md b/rust/datafusion/README.md
index 925a0d2..654f5ba 100644
--- a/rust/datafusion/README.md
+++ b/rust/datafusion/README.md
@@ -23,6 +23,8 @@ DataFusion is an in-memory query engine that uses Apache Arrow as the memory mod
## Usage
+
+#### Use as a lib
Add this to your Cargo.toml:
```toml
@@ -30,6 +32,35 @@ Add this to your Cargo.toml:
datafusion = "0.14.0-SNAPSHOT"
```
+#### Use as a bin
+##### Build your own bin(requires rust toolchains)
+```sh
+git clone https://github/apache/arrow
+cd arrow/rust/datafusion
+cargo run --bin datafusion-cli
+```
+##### Use Dockerfile
+```sh
+git clone https://github/apache/arrow
+cd arrow
+docker build -f rust/datafusion/Dockerfile . --tag datafusion-cli
+docker run -it -v $(your_data_location):/data datafusion-cli
+```
+
+```
+USAGE:
+ datafusion-cli [OPTIONS]
+
+FLAGS:
+ -h, --help Prints help information
+ -V, --version Prints version information
+
+OPTIONS:
+ -c, --batch-size <batch-size> The batch size of each query, default value is 1048576
+ -p, --data-path <data-path> Path to your data, default to current directory
+```
+
+
# Status
## General
diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs
new file mode 100644
index 0000000..ac73b26
--- /dev/null
+++ b/rust/datafusion/src/bin/repl.rs
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate clap;
+
+use arrow::array::*;
+use arrow::datatypes::{DataType, TimeUnit};
+use clap::{App, Arg};
+use datafusion::error::{ExecutionError, Result};
+use datafusion::execution::context::ExecutionContext;
+use datafusion::execution::relation::Relation;
+use prettytable::{Cell, Row, Table};
+use rustyline::Editor;
+use std::cell::RefMut;
+use std::env;
+use std::path::Path;
+
+fn main() {
+ let matches = App::new("DataFusion")
+ .version(crate_version!())
+ .about(
+ "DataFusion is an in-memory query engine that uses Apache Arrow \
+ as the memory model. It supports executing SQL queries against CSV and \
+ Parquet files as well as querying directly against in-memory data.",
+ )
+ .arg(
+ Arg::with_name("data-path")
+ .help("Path to your data, default to current directory")
+ .short("p")
+ .long("data-path")
+ .takes_value(true),
+ )
+ .arg(
+ Arg::with_name("batch-size")
+ .help("The batch size of each query, default value is 1048576")
+ .short("c")
+ .long("batch-size")
+ .takes_value(true),
+ )
+ .get_matches();
+
+ if let Some(path) = matches.value_of("data-path") {
+ let p = Path::new(path);
+ env::set_current_dir(&p).unwrap();
+ };
+
+ let batch_size = matches
+ .value_of("batch-size")
+ .map(|size| size.parse::<usize>().unwrap())
+ .unwrap_or(1_048_576);
+
+ let mut ctx = ExecutionContext::new();
+
+ let mut rl = Editor::<()>::new();
+ rl.load_history(".history").ok();
+
+ let mut query = "".to_owned();
+ loop {
+ let readline = rl.readline("> ");
+ match readline {
+ Ok(ref line) if line.trim_end().ends_with(';') => {
+ query.push_str(line.trim_end());
+ rl.add_history_entry(query.clone());
+ match exec_and_print(&mut ctx, query, batch_size) {
+ Ok(_) => {}
+ Err(err) => println!("{:?}", err),
+ }
+ query = "".to_owned();
+ }
+ Ok(ref line) => {
+ query.push_str(line);
+ query.push_str(" ");
+ }
+ Err(_) => {
+ break;
+ }
+ }
+ }
+
+ rl.save_history(".history").ok();
+}
+
+fn exec_and_print(
+ ctx: &mut ExecutionContext,
+ sql: String,
+ batch_size: usize,
+) -> Result<()> {
+ let relation = ctx.sql(&sql, batch_size)?;
+ print_result(relation.borrow_mut())?;
+
+ Ok(())
+}
+
+fn print_result(mut results: RefMut<Relation>) -> Result<()> {
+ let mut row_count = 0;
+ let mut table = Table::new();
+ let schema = results.schema();
+
+ let mut header = Vec::new();
+ for field in schema.fields() {
+ header.push(Cell::new(&field.name()));
+ }
+ table.add_row(Row::new(header));
+
+ while let Some(batch) = results.next().unwrap() {
+ row_count += batch.num_rows();
+
+ for row in 0..batch.num_rows() {
+ let mut cells = Vec::new();
+ for col in 0..batch.num_columns() {
+ let column = batch.column(col);
+ cells.push(Cell::new(&str_value(column.clone(), row)?));
+ }
+ table.add_row(Row::new(cells));
+ }
+ }
+ table.printstd();
+
+ if row_count > 1 {
+ println!("{} rows in set.", row_count);
+ } else {
+ println!("{} row in set.", row_count);
+ }
+
+ Ok(())
+}
+
+macro_rules! make_string {
+ ($array_type:ty, $column: ident, $row: ident) => {{
+ Ok($column
+ .as_any()
+ .downcast_ref::<$array_type>()
+ .unwrap()
+ .value($row)
+ .to_string())
+ }};
+}
+
+fn str_value(column: ArrayRef, row: usize) -> Result<String> {
+ match column.data_type() {
+ DataType::Utf8 => Ok(column
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .unwrap()
+ .get_string(row)),
+ DataType::Boolean => make_string!(BooleanArray, column, row),
+ DataType::Int16 => make_string!(Int16Array, column, row),
+ DataType::Int32 => make_string!(Int32Array, column, row),
+ DataType::Int64 => make_string!(Int64Array, column, row),
+ DataType::UInt8 => make_string!(UInt8Array, column, row),
+ DataType::UInt16 => make_string!(UInt16Array, column, row),
+ DataType::UInt32 => make_string!(UInt32Array, column, row),
+ DataType::UInt64 => make_string!(UInt64Array, column, row),
+ DataType::Float16 => make_string!(Float32Array, column, row),
+ DataType::Float32 => make_string!(Float32Array, column, row),
+ DataType::Float64 => make_string!(Float64Array, column, row),
+ DataType::Timestamp(unit) if *unit == TimeUnit::Second => {
+ make_string!(TimestampSecondArray, column, row)
+ }
+ DataType::Timestamp(unit) if *unit == TimeUnit::Millisecond => {
+ make_string!(TimestampMillisecondArray, column, row)
+ }
+ DataType::Timestamp(unit) if *unit == TimeUnit::Microsecond => {
+ make_string!(TimestampMicrosecondArray, column, row)
+ }
+ DataType::Timestamp(unit) if *unit == TimeUnit::Nanosecond => {
+ make_string!(TimestampNanosecondArray, column, row)
+ }
+ DataType::Date32(_) => make_string!(Date32Array, column, row),
+ DataType::Date64(_) => make_string!(Date64Array, column, row),
+ DataType::Time32(unit) if *unit == TimeUnit::Second => {
+ make_string!(Time32SecondArray, column, row)
+ }
+ DataType::Time32(unit) if *unit == TimeUnit::Millisecond => {
+ make_string!(Time32MillisecondArray, column, row)
+ }
+ DataType::Time32(unit) if *unit == TimeUnit::Microsecond => {
+ make_string!(Time64MicrosecondArray, column, row)
+ }
+ DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
+ make_string!(Time64NanosecondArray, column, row)
+ }
+ _ => Err(ExecutionError::ExecutionError(format!(
+ "Unsupported {:?} type for repl.",
+ column.data_type()
+ ))),
+ }
+}
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 1618e6a..1bebdb5 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -25,6 +25,8 @@ use std::sync::Arc;
use arrow::datatypes::*;
+use crate::arrow::array::ArrayRef;
+use crate::arrow::builder::BooleanBuilder;
use crate::datasource::csv::CsvFile;
use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
@@ -34,15 +36,18 @@ use crate::execution::filter::FilterRelation;
use crate::execution::limit::LimitRelation;
use crate::execution::projection::ProjectRelation;
use crate::execution::relation::{DataSourceRelation, Relation};
+use crate::execution::scalar_relation::ScalarRelation;
use crate::execution::table_impl::TableImpl;
use crate::logicalplan::*;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::type_coercion::TypeCoercionRule;
use crate::optimizer::utils;
+use crate::sql::parser::FileType;
use crate::sql::parser::{DFASTNode, DFParser};
use crate::sql::planner::{SchemaProvider, SqlToRel};
use crate::table::Table;
+use sqlparser::sqlast::{SQLColumnDef, SQLType};
/// Execution context for registering data sources and executing queries
pub struct ExecutionContext {
@@ -50,7 +55,7 @@ pub struct ExecutionContext {
}
impl ExecutionContext {
- /// Create a new excution context for in-memory queries
+ /// Create a new execution context for in-memory queries
pub fn new() -> Self {
Self {
datasources: Rc::new(RefCell::new(HashMap::new())),
@@ -83,9 +88,61 @@ impl ExecutionContext {
Ok(self.optimize(&plan)?)
}
- other => Err(ExecutionError::General(format!(
- "Cannot create logical plan from {:?}",
- other
+ DFASTNode::CreateExternalTable {
+ name,
+ columns,
+ file_type,
+ header_row,
+ location,
+ } => {
+ let schema = Arc::new(self.build_schema(columns)?);
+
+ Ok(Arc::new(LogicalPlan::CreateExternalTable {
+ schema,
+ name,
+ location,
+ file_type,
+ header_row,
+ }))
+ }
+ }
+ }
+
+ fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
+ let mut fields = Vec::new();
+
+ for column in columns {
+ let data_type = self.make_data_type(column.data_type)?;
+ fields.push(Field::new(&column.name, data_type, column.allow_null));
+ }
+
+ Ok(Schema::new(fields))
+ }
+
+ fn make_data_type(&self, sql_type: SQLType) -> Result<DataType> {
+ match sql_type {
+ SQLType::BigInt => Ok(DataType::Int64),
+ SQLType::Int => Ok(DataType::Int32),
+ SQLType::SmallInt => Ok(DataType::Int16),
+ SQLType::Char(_) | SQLType::Varchar(_) | SQLType::Text => Ok(DataType::Utf8),
+ SQLType::Decimal(_, _) => Ok(DataType::Float64),
+ SQLType::Float(_) => Ok(DataType::Float32),
+ SQLType::Real | SQLType::Double => Ok(DataType::Float64),
+ SQLType::Boolean => Ok(DataType::Boolean),
+ SQLType::Date => Ok(DataType::Date64(DateUnit::Day)),
+ SQLType::Time => Ok(DataType::Time64(TimeUnit::Millisecond)),
+ SQLType::Timestamp => Ok(DataType::Date64(DateUnit::Millisecond)),
+ SQLType::Uuid
+ | SQLType::Clob(_)
+ | SQLType::Binary(_)
+ | SQLType::Varbinary(_)
+ | SQLType::Blob(_)
+ | SQLType::Regclass
+ | SQLType::Bytea
+ | SQLType::Custom(_)
+ | SQLType::Array(_) => Err(ExecutionError::General(format!(
+ "Unsupported data type: {:?}.",
+ sql_type
))),
}
}
@@ -110,7 +167,7 @@ impl ExecutionContext {
/// Get a table by name
pub fn table(&mut self, table_name: &str) -> Result<Arc<Table>> {
- match self.datasources.borrow().get(table_name) {
+ match (*self.datasources).borrow().get(table_name) {
Some(provider) => {
Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::TableScan {
schema_name: "".to_string(),
@@ -151,7 +208,7 @@ impl ExecutionContext {
ref table_name,
ref projection,
..
- } => match self.datasources.borrow().get(table_name) {
+ } => match (*self.datasources).borrow().get(table_name) {
Some(provider) => {
let ds = provider.scan(projection, batch_size)?;
if ds.len() == 1 {
@@ -277,6 +334,38 @@ impl ExecutionContext {
}
}
+ LogicalPlan::CreateExternalTable {
+ ref schema,
+ ref name,
+ ref location,
+ ref file_type,
+ ref header_row,
+ } => {
+ match file_type {
+ FileType::CSV => {
+ self.register_csv(name, location, schema, *header_row)
+ }
+ _ => {
+ return Err(ExecutionError::ExecutionError(format!(
+ "Unsupported file type {:?}.",
+ file_type
+ )));
+ }
+ }
+ let mut builder = BooleanBuilder::new(1);
+ builder.append_value(true)?;
+
+ let columns = vec![Arc::new(builder.finish()) as ArrayRef];
+ Ok(Rc::new(RefCell::new(ScalarRelation::new(
+ Arc::new(Schema::new(vec![Field::new(
+ "result",
+ DataType::Boolean,
+ false,
+ )])),
+ columns,
+ ))))
+ }
+
_ => Err(ExecutionError::NotImplemented(
"Unsupported logical plan for execution".to_string(),
)),
@@ -289,7 +378,7 @@ struct ExecutionContextSchemaProvider {
}
impl SchemaProvider for ExecutionContextSchemaProvider {
fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>> {
- match self.datasources.borrow().get(name) {
+ match (*self.datasources).borrow().get(name) {
Some(ds) => Some(ds.schema().clone()),
None => None,
}
diff --git a/rust/datafusion/src/execution/mod.rs b/rust/datafusion/src/execution/mod.rs
index cfd748a..d4f57a7 100644
--- a/rust/datafusion/src/execution/mod.rs
+++ b/rust/datafusion/src/execution/mod.rs
@@ -24,4 +24,5 @@ pub mod filter;
pub mod limit;
pub mod projection;
pub mod relation;
+pub mod scalar_relation;
pub mod table_impl;
diff --git a/rust/datafusion/src/execution/scalar_relation.rs b/rust/datafusion/src/execution/scalar_relation.rs
new file mode 100644
index 0000000..96e3118
--- /dev/null
+++ b/rust/datafusion/src/execution/scalar_relation.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Scalar relation, emit one fixed scalar value.
+
+use crate::error::Result;
+use crate::execution::relation::Relation;
+use arrow::array::ArrayRef;
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+use std::sync::Arc;
+
+/// A relation emit single scalar array;
+pub(super) struct ScalarRelation {
+ /// The schema for the limit relation, which is always the same as the schema of the input relation
+ schema: Arc<Schema>,
+
+ value: Vec<ArrayRef>,
+
+ /// The number of rows that have been returned so far
+ emitted: bool,
+}
+
+impl ScalarRelation {
+ pub fn new(schema: Arc<Schema>, value: Vec<ArrayRef>) -> Self {
+ Self {
+ schema,
+ value,
+ emitted: false,
+ }
+ }
+}
+
+impl Relation for ScalarRelation {
+ fn next(&mut self) -> Result<Option<RecordBatch>> {
+ if self.emitted {
+ return Ok(None);
+ }
+
+ self.emitted = true;
+
+ Ok(Some(RecordBatch::try_new(
+ self.schema().clone(),
+ self.value.clone(),
+ )?))
+ }
+
+ fn schema(&self) -> &Arc<Schema> {
+ &self.schema
+ }
+}
diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs
index c2ac37b..8e69056 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -24,6 +24,7 @@ use arrow::datatypes::{DataType, Field, Schema};
use crate::error::{ExecutionError, Result};
use crate::optimizer::utils;
+use crate::sql::parser::FileType;
/// Enumeration of supported function types (Scalar and Aggregate)
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -441,6 +442,19 @@ pub enum LogicalPlan {
/// The schema description
schema: Arc<Schema>,
},
+ /// Represents a create external table expression.
+ CreateExternalTable {
+ /// The table schema
+ schema: Arc<Schema>,
+ /// The table name
+ name: String,
+ /// The physical location
+ location: String,
+ /// The file type of physical file
+ file_type: FileType,
+ /// Whether the CSV file contains a header
+ header_row: bool,
+ },
}
impl LogicalPlan {
@@ -454,6 +468,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { schema, .. } => &schema,
LogicalPlan::Sort { schema, .. } => &schema,
LogicalPlan::Limit { schema, .. } => &schema,
+ LogicalPlan::CreateExternalTable { schema, .. } => &schema,
}
}
}
@@ -530,6 +545,9 @@ impl LogicalPlan {
write!(f, "Limit: {:?}", expr)?;
input.fmt_with_indent(f, indent + 1)
}
+ LogicalPlan::CreateExternalTable { ref name, .. } => {
+ write!(f, "CreateExternalTable: {:?}", name)
+ }
}
}
}
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index a75982f..52ac440 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -191,6 +191,19 @@ impl ProjectionPushDown {
input: input.clone(),
schema: schema.clone(),
})),
+ LogicalPlan::CreateExternalTable {
+ schema,
+ name,
+ location,
+ file_type,
+ header_row,
+ } => Ok(Arc::new(LogicalPlan::CreateExternalTable {
+ schema: schema.clone(),
+ name: name.to_string(),
+ location: location.to_string(),
+ file_type: file_type.clone(),
+ header_row: *header_row,
+ })),
}
}
diff --git a/rust/datafusion/src/sql/parser.rs b/rust/datafusion/src/sql/parser.rs
index dc46ce0..74ae4cf 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -32,7 +32,7 @@ macro_rules! parser_err {
}
/// Types of files to parse as DataFrames
-#[derive(Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum FileType {
/// Newline-delimited JSON
NdJson,
@@ -134,18 +134,16 @@ impl DFParser {
true
};
- match self.parser.peek_token() {
- Some(Token::Comma) => {
- self.parser.next_token();
- columns.push(SQLColumnDef {
- name: column_name,
- data_type: data_type,
- allow_null,
- default: None,
- is_primary: false,
- is_unique: false,
- });
- }
+ columns.push(SQLColumnDef {
+ name: column_name,
+ data_type: data_type,
+ allow_null,
+ default: None,
+ is_primary: false,
+ is_unique: false,
+ });
+ match self.parser.next_token() {
+ Some(Token::Comma) => continue,
Some(Token::RParen) => break,
_ => {
return parser_err!(
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 81c52c9..566fecf 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -241,6 +241,26 @@ fn csv_query_limit_zero() {
assert_eq!(expected, actual);
}
+#[test]
+fn csv_query_create_external_table() {
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_csv_by_sql(&mut ctx);
+ let sql = "SELECT c1, c2, c3, c4, c5, c6, c7, c8, c9, 10, c11, c12, c13 FROM aggregate_test_100 LIMIT 1";
+ let actual = execute(&mut ctx, sql);
+ let expected = "\"c\"\t2\t1\t18109\t2033001162\t-6513304855495910254\t25\t43062\t1491205016\t10\t0.110830784\t0.9294097332465232\t\"6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW\"\n".to_string();
+ assert_eq!(expected, actual);
+}
+
+#[test]
+fn csv_query_external_table_count() {
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_csv_by_sql(&mut ctx);
+ let sql = "SELECT COUNT(c12) FROM aggregate_test_100";
+ let actual = execute(&mut ctx, sql);
+ let expected = "100\n".to_string();
+ assert_eq!(expected, actual);
+}
+
//TODO Uncomment the following test when ORDER BY is implemented to be able to test ORDER
// BY + LIMIT
/*
@@ -273,6 +293,39 @@ fn aggr_test_schema() -> Arc<Schema> {
]))
}
+fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
+ let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+
+ // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once unsigned is supported.
+ ctx.sql(
+ &format!(
+ "
+ CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 INT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT NOT NULL,
+ c5 INT NOT NULL,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+ STORED AS CSV
+ WITH HEADER ROW
+ LOCATION '{}/csv/aggregate_test_100.csv'
+ ",
+ testdata
+ ),
+ 1024,
+ )
+ .unwrap();
+}
+
fn register_aggregate_csv(ctx: &mut ExecutionContext) {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let schema = aggr_test_schema();