You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/09 10:59:26 UTC
[arrow-datafusion] branch master updated: Add print format param
and support for csv print format to datafusion cli (#289)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 204d4f5 Add print format param and support for csv print format to datafusion cli (#289)
204d4f5 is described below
commit 204d4f588a5820f25fc5c6d6599d368c1ee04c3e
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Sun May 9 18:59:19 2021 +0800
Add print format param and support for csv print format to datafusion cli (#289)
* add csv mode to datafusion cli
* add license
* fix per comments
* update help
---
datafusion-cli/Cargo.toml | 1 +
datafusion-cli/src/format.rs | 17 ++++++++
datafusion-cli/src/format/print_format.rs | 64 +++++++++++++++++++++++++++++++
datafusion-cli/src/main.rs | 50 +++++++++++++++++++-----
4 files changed, 122 insertions(+), 10 deletions(-)
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 883d0f2..2cde4da 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -31,3 +31,4 @@ clap = "2.33"
rustyline = "8.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion" }
+arrow = { git = "https://github.com/apache/arrow-rs", rev = "508f25c10032857da34ea88cc8166f0741616a32" }
diff --git a/datafusion-cli/src/format.rs b/datafusion-cli/src/format.rs
new file mode 100644
index 0000000..c5da78f
--- /dev/null
+++ b/datafusion-cli/src/format.rs
@@ -0,0 +1,17 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+pub mod print_format;
diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs
new file mode 100644
index 0000000..921e29f
--- /dev/null
+++ b/datafusion-cli/src/format/print_format.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Print format variants
+use arrow::csv::writer::WriterBuilder;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::arrow::util::pretty;
+use datafusion::error::{DataFusionError, Result};
+use std::str::FromStr;
+
+/// Allow records to be printed in different formats
+#[derive(Debug, Clone)]
+pub enum PrintFormat {
+ Csv,
+ Table,
+}
+
+impl FromStr for PrintFormat {
+ type Err = ();
+ fn from_str(s: &str) -> std::result::Result<PrintFormat, ()> {
+ match s {
+ "csv" => Ok(PrintFormat::Csv),
+ "table" => Ok(PrintFormat::Table),
+ _ => Err(()),
+ }
+ }
+}
+
+impl PrintFormat {
+ /// print the batches to stdout using the specified format
+ pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> {
+ match self {
+ PrintFormat::Csv => {
+ let mut bytes = vec![];
+ {
+ let builder = WriterBuilder::new().has_headers(true);
+ let mut writer = builder.build(&mut bytes);
+ for batch in batches {
+ writer.write(batch)?;
+ }
+ }
+ let csv = String::from_utf8(bytes)
+ .map_err(|e| DataFusionError::Execution(e.to_string()))?;
+ println!("{}", csv);
+ }
+ PrintFormat::Table => pretty::print_batches(batches)?,
+ }
+ Ok(())
+ }
+}
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 2e8fe11..05c6766 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -16,10 +16,13 @@
// under the License.
#![allow(bare_trait_objects)]
+
+mod format;
+
use clap::{crate_version, App, Arg};
-use datafusion::arrow::util::pretty;
use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use format::print_format::PrintFormat;
use rustyline::Editor;
use std::env;
use std::fs::File;
@@ -55,12 +58,20 @@ pub async fn main() {
)
.arg(
Arg::with_name("file")
- .help("execute commands from file, then exit")
+ .help("Execute commands from file, then exit")
.short("f")
.long("file")
.validator(is_valid_file)
.takes_value(true),
)
+ .arg(
+ Arg::with_name("format")
+ .help("Output format (possible values: table, csv)")
+ .long("format")
+ .default_value("table")
+ .validator(is_valid_format)
+ .takes_value(true),
+ )
.get_matches();
if let Some(path) = matches.value_of("data-path") {
@@ -77,19 +88,26 @@ pub async fn main() {
execution_config = execution_config.with_batch_size(batch_size);
};
+ let print_format = matches
+ .value_of("format")
+ .expect("No format is specified")
+ .parse::<PrintFormat>()
+ .expect("Invalid format");
+
if let Some(file_path) = matches.value_of("file") {
let file = File::open(file_path)
.unwrap_or_else(|err| panic!("cannot open file '{}': {}", file_path, err));
let mut reader = BufReader::new(file);
- exec_from_lines(&mut reader, execution_config).await;
+ exec_from_lines(&mut reader, execution_config, print_format).await;
} else {
- exec_from_repl(execution_config).await;
+ exec_from_repl(execution_config, print_format).await;
}
}
async fn exec_from_lines(
reader: &mut BufReader<File>,
execution_config: ExecutionConfig,
+ print_format: PrintFormat,
) {
let mut ctx = ExecutionContext::with_config(execution_config);
let mut query = "".to_owned();
@@ -100,7 +118,7 @@ async fn exec_from_lines(
let line = line.trim_end();
query.push_str(line);
if line.ends_with(';') {
- match exec_and_print(&mut ctx, query).await {
+ match exec_and_print(&mut ctx, print_format.clone(), query).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
}
@@ -117,14 +135,14 @@ async fn exec_from_lines(
// run the left over query if the last statement doesn't contain ‘;’
if !query.is_empty() {
- match exec_and_print(&mut ctx, query).await {
+ match exec_and_print(&mut ctx, print_format, query).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
}
}
}
-async fn exec_from_repl(execution_config: ExecutionConfig) {
+async fn exec_from_repl(execution_config: ExecutionConfig, print_format: PrintFormat) {
let mut ctx = ExecutionContext::with_config(execution_config);
let mut rl = Editor::<()>::new();
@@ -139,7 +157,7 @@ async fn exec_from_repl(execution_config: ExecutionConfig) {
Ok(ref line) if line.trim_end().ends_with(';') => {
query.push_str(line.trim_end());
rl.add_history_entry(query.clone());
- match exec_and_print(&mut ctx, query).await {
+ match exec_and_print(&mut ctx, print_format.clone(), query).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
}
@@ -158,6 +176,14 @@ async fn exec_from_repl(execution_config: ExecutionConfig) {
rl.save_history(".history").ok();
}
+fn is_valid_format(format: String) -> std::result::Result<(), String> {
+ match format.to_lowercase().as_str() {
+ "csv" => Ok(()),
+ "table" => Ok(()),
+ _ => Err(format!("Format '{}' not supported", format)),
+ }
+}
+
fn is_valid_file(dir: String) -> std::result::Result<(), String> {
if Path::new(&dir).is_file() {
Ok(())
@@ -186,7 +212,11 @@ fn is_exit_command(line: &str) -> bool {
line == "quit" || line == "exit"
}
-async fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
+async fn exec_and_print(
+ ctx: &mut ExecutionContext,
+ print_format: PrintFormat,
+ sql: String,
+) -> Result<()> {
let now = Instant::now();
let df = ctx.sql(&sql)?;
@@ -200,7 +230,7 @@ async fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
return Ok(());
}
- pretty::print_batches(&results)?;
+ print_format.print_batches(&results)?;
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();