You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/09 10:59:26 UTC

[arrow-datafusion] branch master updated: Add print format param and support for csv print format to datafusion cli (#289)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 204d4f5  Add print format param and support for csv print format to datafusion cli (#289)
204d4f5 is described below

commit 204d4f588a5820f25fc5c6d6599d368c1ee04c3e
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Sun May 9 18:59:19 2021 +0800

    Add print format param and support for csv print format to datafusion cli (#289)
    
    * add csv mode to datafusion cli
    
    * add license
    
    * fix per comments
    
    * update help
---
 datafusion-cli/Cargo.toml                 |  1 +
 datafusion-cli/src/format.rs              | 17 ++++++++
 datafusion-cli/src/format/print_format.rs | 64 +++++++++++++++++++++++++++++++
 datafusion-cli/src/main.rs                | 50 +++++++++++++++++++-----
 4 files changed, 122 insertions(+), 10 deletions(-)

diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 883d0f2..2cde4da 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -31,3 +31,4 @@ clap = "2.33"
 rustyline = "8.0"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
 datafusion = { path = "../datafusion" }
+arrow = { git = "https://github.com/apache/arrow-rs", rev = "508f25c10032857da34ea88cc8166f0741616a32" }
diff --git a/datafusion-cli/src/format.rs b/datafusion-cli/src/format.rs
new file mode 100644
index 0000000..c5da78f
--- /dev/null
+++ b/datafusion-cli/src/format.rs
@@ -0,0 +1,17 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+pub mod print_format;
diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs
new file mode 100644
index 0000000..921e29f
--- /dev/null
+++ b/datafusion-cli/src/format/print_format.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Print format variants
+use arrow::csv::writer::WriterBuilder;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::arrow::util::pretty;
+use datafusion::error::{DataFusionError, Result};
+use std::str::FromStr;
+
+/// Allow records to be printed in different formats
+#[derive(Debug, Clone)]
+pub enum PrintFormat {
+    Csv,
+    Table,
+}
+
+impl FromStr for PrintFormat {
+    type Err = ();
+    fn from_str(s: &str) -> std::result::Result<PrintFormat, ()> {
+        match s {
+            "csv" => Ok(PrintFormat::Csv),
+            "table" => Ok(PrintFormat::Table),
+            _ => Err(()),
+        }
+    }
+}
+
+impl PrintFormat {
+    /// print the batches to stdout using the specified format
+    pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> {
+        match self {
+            PrintFormat::Csv => {
+                let mut bytes = vec![];
+                {
+                    let builder = WriterBuilder::new().has_headers(true);
+                    let mut writer = builder.build(&mut bytes);
+                    for batch in batches {
+                        writer.write(batch)?;
+                    }
+                }
+                let csv = String::from_utf8(bytes)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?;
+                println!("{}", csv);
+            }
+            PrintFormat::Table => pretty::print_batches(batches)?,
+        }
+        Ok(())
+    }
+}
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 2e8fe11..05c6766 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -16,10 +16,13 @@
 // under the License.
 
 #![allow(bare_trait_objects)]
+
+mod format;
+
 use clap::{crate_version, App, Arg};
-use datafusion::arrow::util::pretty;
 use datafusion::error::Result;
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use format::print_format::PrintFormat;
 use rustyline::Editor;
 use std::env;
 use std::fs::File;
@@ -55,12 +58,20 @@ pub async fn main() {
         )
         .arg(
             Arg::with_name("file")
-                .help("execute commands from file, then exit")
+                .help("Execute commands from file, then exit")
                 .short("f")
                 .long("file")
                 .validator(is_valid_file)
                 .takes_value(true),
         )
+        .arg(
+            Arg::with_name("format")
+                .help("Output format (possible values: table, csv)")
+                .long("format")
+                .default_value("table")
+                .validator(is_valid_format)
+                .takes_value(true),
+        )
         .get_matches();
 
     if let Some(path) = matches.value_of("data-path") {
@@ -77,19 +88,26 @@ pub async fn main() {
         execution_config = execution_config.with_batch_size(batch_size);
     };
 
+    let print_format = matches
+        .value_of("format")
+        .expect("No format is specified")
+        .parse::<PrintFormat>()
+        .expect("Invalid format");
+
     if let Some(file_path) = matches.value_of("file") {
         let file = File::open(file_path)
             .unwrap_or_else(|err| panic!("cannot open file '{}': {}", file_path, err));
         let mut reader = BufReader::new(file);
-        exec_from_lines(&mut reader, execution_config).await;
+        exec_from_lines(&mut reader, execution_config, print_format).await;
     } else {
-        exec_from_repl(execution_config).await;
+        exec_from_repl(execution_config, print_format).await;
     }
 }
 
 async fn exec_from_lines(
     reader: &mut BufReader<File>,
     execution_config: ExecutionConfig,
+    print_format: PrintFormat,
 ) {
     let mut ctx = ExecutionContext::with_config(execution_config);
     let mut query = "".to_owned();
@@ -100,7 +118,7 @@ async fn exec_from_lines(
                 let line = line.trim_end();
                 query.push_str(line);
                 if line.ends_with(';') {
-                    match exec_and_print(&mut ctx, query).await {
+                    match exec_and_print(&mut ctx, print_format.clone(), query).await {
                         Ok(_) => {}
                         Err(err) => println!("{:?}", err),
                     }
@@ -117,14 +135,14 @@ async fn exec_from_lines(
 
     // run the left over query if the last statement doesn't contain ‘;’
     if !query.is_empty() {
-        match exec_and_print(&mut ctx, query).await {
+        match exec_and_print(&mut ctx, print_format, query).await {
             Ok(_) => {}
             Err(err) => println!("{:?}", err),
         }
     }
 }
 
-async fn exec_from_repl(execution_config: ExecutionConfig) {
+async fn exec_from_repl(execution_config: ExecutionConfig, print_format: PrintFormat) {
     let mut ctx = ExecutionContext::with_config(execution_config);
 
     let mut rl = Editor::<()>::new();
@@ -139,7 +157,7 @@ async fn exec_from_repl(execution_config: ExecutionConfig) {
             Ok(ref line) if line.trim_end().ends_with(';') => {
                 query.push_str(line.trim_end());
                 rl.add_history_entry(query.clone());
-                match exec_and_print(&mut ctx, query).await {
+                match exec_and_print(&mut ctx, print_format.clone(), query).await {
                     Ok(_) => {}
                     Err(err) => println!("{:?}", err),
                 }
@@ -158,6 +176,14 @@ async fn exec_from_repl(execution_config: ExecutionConfig) {
     rl.save_history(".history").ok();
 }
 
+fn is_valid_format(format: String) -> std::result::Result<(), String> {
+    match format.to_lowercase().as_str() {
+        "csv" => Ok(()),
+        "table" => Ok(()),
+        _ => Err(format!("Format '{}' not supported", format)),
+    }
+}
+
 fn is_valid_file(dir: String) -> std::result::Result<(), String> {
     if Path::new(&dir).is_file() {
         Ok(())
@@ -186,7 +212,11 @@ fn is_exit_command(line: &str) -> bool {
     line == "quit" || line == "exit"
 }
 
-async fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
+async fn exec_and_print(
+    ctx: &mut ExecutionContext,
+    print_format: PrintFormat,
+    sql: String,
+) -> Result<()> {
     let now = Instant::now();
 
     let df = ctx.sql(&sql)?;
@@ -200,7 +230,7 @@ async fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
         return Ok(());
     }
 
-    pretty::print_batches(&results)?;
+    print_format.print_batches(&results)?;
 
     let row_count: usize = results.iter().map(|b| b.num_rows()).sum();