You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/24 12:43:34 UTC

[arrow-datafusion] branch master updated: [DataFusion] - Add show and show_limit function for DataFrame (#923)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5871207  [DataFusion] -  Add show and show_limit function for DataFrame (#923)
5871207 is described below

commit 58712078391502b740cf1f92f960edf4d1721187
Author: Francis Du <fr...@francisdu.com>
AuthorDate: Tue Aug 24 20:43:27 2021 +0800

    [DataFusion] -  Add show and show_limit function for DataFrame (#923)
    
    * feat: support show function for DataFrame
    
    * fix: fix docs comments
    
    * fix: fix typo
    
    * fix: fix pre-commit
    
    * fix: fix code format
    
    * fix: improve show function implementation
    
    * fix: change match pattern to 'if let' single pattern
    
    * fix: Rewrite show function impl and add a new show_limit function
    
    * fix: Add the show function to the sample code
    
    * fix: fix cargo test error
---
 README.md                                          |  9 +++----
 ballista-examples/src/bin/ballista-dataframe.rs    |  5 ++--
 ballista-examples/src/bin/ballista-sql.rs          |  5 ++--
 datafusion-examples/examples/csv_sql.rs            |  5 +---
 datafusion-examples/examples/dataframe.rs          |  7 +----
 .../examples/dataframe_in_memory.rs                |  6 +----
 datafusion-examples/examples/flight_client.rs      |  2 +-
 datafusion-examples/examples/parquet_sql.rs        |  5 +---
 datafusion-examples/examples/simple_udf.rs         |  6 +----
 datafusion/src/dataframe.rs                        | 30 ++++++++++++++++++++++
 datafusion/src/execution/dataframe_impl.rs         | 13 ++++++++++
 docs/user-guide/src/example-usage.md               |  9 +++----
 pre-commit.sh                                      |  6 ++---
 13 files changed, 61 insertions(+), 47 deletions(-)

diff --git a/README.md b/README.md
index 0b1b679..ed2788c 100644
--- a/README.md
+++ b/README.md
@@ -82,8 +82,7 @@ async fn main() -> datafusion::error::Result<()> {
   let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
 
   // execute and print results
-  let results: Vec<RecordBatch> = df.collect().await?;
-  print_batches(&results)?;
+  df.show().await?;
   Ok(())
 }
 ```
@@ -102,12 +101,10 @@ async fn main() -> datafusion::error::Result<()> {
   let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
 
   let df = df.filter(col("a").lt_eq(col("b")))?
-          .aggregate(vec![col("a")], vec![min(col("b"))])?
-          .limit(100)?;
+          .aggregate(vec![col("a")], vec![min(col("b"))])?;
 
   // execute and print results
-  let results: Vec<RecordBatch> = df.collect().await?;
-  print_batches(&results)?;
+  df.show_limit(100).await?;
   Ok(())
 }
 ```
diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs
index 693e676..434ed7b 100644
--- a/ballista-examples/src/bin/ballista-dataframe.rs
+++ b/ballista-examples/src/bin/ballista-dataframe.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use ballista::prelude::*;
-use datafusion::arrow::util::pretty;
 use datafusion::prelude::{col, lit};
 
 /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
@@ -38,8 +37,8 @@ async fn main() -> Result<()> {
         .select_columns(&["id", "bool_col", "timestamp_col"])?
         .filter(col("id").gt(lit(1)))?;
 
-    let results = df.collect().await?;
-    pretty::print_batches(&results)?;
+    // print the results
+    df.show().await?;
 
     Ok(())
 }
diff --git a/ballista-examples/src/bin/ballista-sql.rs b/ballista-examples/src/bin/ballista-sql.rs
index 590ab7b..4b303e3 100644
--- a/ballista-examples/src/bin/ballista-sql.rs
+++ b/ballista-examples/src/bin/ballista-sql.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use ballista::prelude::*;
-use datafusion::arrow::util::pretty;
 use datafusion::prelude::CsvReadOptions;
 
 /// This example demonstrates executing a simple query against an Arrow data source (CSV) and
@@ -45,8 +44,8 @@ async fn main() -> Result<()> {
         GROUP BY c1",
     )?;
 
-    let results = df.collect().await?;
-    pretty::print_batches(&results)?;
+    // print the results
+    df.show().await?;
 
     Ok(())
 }
diff --git a/datafusion-examples/examples/csv_sql.rs b/datafusion-examples/examples/csv_sql.rs
index a06b42a..a1cdf5d 100644
--- a/datafusion-examples/examples/csv_sql.rs
+++ b/datafusion-examples/examples/csv_sql.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::arrow::util::pretty;
-
 use datafusion::error::Result;
 use datafusion::prelude::*;
 
@@ -43,10 +41,9 @@ async fn main() -> Result<()> {
         WHERE c11 > 0.1 AND c11 < 0.9 \
         GROUP BY c1",
     )?;
-    let results = df.collect().await?;
 
     // print the results
-    pretty::print_batches(&results)?;
+    df.show().await?;
 
     Ok(())
 }
diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs
index dcf6bc3..013f322 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::arrow::util::pretty;
-
 use datafusion::error::Result;
 use datafusion::prelude::*;
 
@@ -37,11 +35,8 @@ async fn main() -> Result<()> {
         .select_columns(&["id", "bool_col", "timestamp_col"])?
         .filter(col("id").gt(lit(1)))?;
 
-    // execute the query
-    let results = df.collect().await?;
-
     // print the results
-    pretty::print_batches(&results)?;
+    df.show().await?;
 
     Ok(())
 }
diff --git a/datafusion-examples/examples/dataframe_in_memory.rs b/datafusion-examples/examples/dataframe_in_memory.rs
index 0c65a74..27ac079 100644
--- a/datafusion-examples/examples/dataframe_in_memory.rs
+++ b/datafusion-examples/examples/dataframe_in_memory.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
 use datafusion::arrow::array::{Int32Array, StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::arrow::util::pretty;
 
 use datafusion::datasource::MemTable;
 use datafusion::error::Result;
@@ -57,11 +56,8 @@ async fn main() -> Result<()> {
 
     let df = df.select_columns(&["a", "b"])?.filter(filter)?;
 
-    // execute
-    let results = df.collect().await?;
-
     // print the results
-    pretty::print_batches(&results)?;
+    df.show().await?;
 
     Ok(())
 }
diff --git a/datafusion-examples/examples/flight_client.rs b/datafusion-examples/examples/flight_client.rs
index 5334782..6fc8014 100644
--- a/datafusion-examples/examples/flight_client.rs
+++ b/datafusion-examples/examples/flight_client.rs
@@ -19,12 +19,12 @@ use std::convert::TryFrom;
 use std::sync::Arc;
 
 use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::util::pretty;
 
 use arrow_flight::flight_descriptor;
 use arrow_flight::flight_service_client::FlightServiceClient;
 use arrow_flight::utils::flight_data_to_arrow_batch;
 use arrow_flight::{FlightDescriptor, Ticket};
+use datafusion::arrow::util::pretty;
 
 /// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for
 /// Parquet files and executing SQL queries against them on a remote server.
diff --git a/datafusion-examples/examples/parquet_sql.rs b/datafusion-examples/examples/parquet_sql.rs
index f679b22..2f3ce91 100644
--- a/datafusion-examples/examples/parquet_sql.rs
+++ b/datafusion-examples/examples/parquet_sql.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::arrow::util::pretty;
-
 use datafusion::error::Result;
 use datafusion::prelude::*;
 
@@ -41,10 +39,9 @@ async fn main() -> Result<()> {
         FROM alltypes_plain \
         WHERE id > 1 AND tinyint_col < double_col",
     )?;
-    let results = df.collect().await?;
 
     // print the results
-    pretty::print_batches(&results)?;
+    df.show().await?;
 
     Ok(())
 }
diff --git a/datafusion-examples/examples/simple_udf.rs b/datafusion-examples/examples/simple_udf.rs
index 0ffec44..f71b9eb 100644
--- a/datafusion-examples/examples/simple_udf.rs
+++ b/datafusion-examples/examples/simple_udf.rs
@@ -19,7 +19,6 @@ use datafusion::arrow::{
     array::{ArrayRef, Float32Array, Float64Array},
     datatypes::DataType,
     record_batch::RecordBatch,
-    util::pretty,
 };
 
 use datafusion::prelude::*;
@@ -141,11 +140,8 @@ async fn main() -> Result<()> {
 
     // note that "b" is f32, not f64. DataFusion coerces the types to match the UDF's signature.
 
-    // execute the query
-    let results = df.collect().await?;
-
     // print the results
-    pretty::print_batches(&results)?;
+    df.show().await?;
 
     Ok(())
 }
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 608f6db..da64ffa 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -223,6 +223,36 @@ pub trait DataFrame: Send + Sync {
     /// ```
     async fn collect(&self) -> Result<Vec<RecordBatch>>;
 
+    /// Print results.
+    ///
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// df.show().await?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    async fn show(&self) -> Result<()>;
+
+    /// Print results and limit rows.
+    ///
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// df.show_limit(10).await?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    async fn show_limit(&self, n: usize) -> Result<()>;
+
     /// Executes this DataFrame and returns a stream over a single partition
     ///
     /// ```
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index ddaa04e..5e1a4f4 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -31,6 +31,7 @@ use crate::{
     physical_plan::{collect, collect_partitioned},
 };
 
+use crate::arrow::util::pretty;
 use crate::physical_plan::{
     execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
 };
@@ -156,6 +157,18 @@ impl DataFrame for DataFrameImpl {
         Ok(collect(plan).await?)
     }
 
+    /// Print results.
+    async fn show(&self) -> Result<()> {
+        let results = self.collect().await?;
+        Ok(pretty::print_batches(&results)?)
+    }
+
+    /// Print results and limit rows.
+    async fn show_limit(&self, num: usize) -> Result<()> {
+        let results = self.limit(num)?.collect().await?;
+        Ok(pretty::print_batches(&results)?)
+    }
+
     /// Convert the logical plan represented by this DataFrame into a physical plan and
     /// execute it, returning a stream over a single partition
     async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
diff --git a/docs/user-guide/src/example-usage.md b/docs/user-guide/src/example-usage.md
index 2ea7dca..4280079 100644
--- a/docs/user-guide/src/example-usage.md
+++ b/docs/user-guide/src/example-usage.md
@@ -36,8 +36,7 @@ async fn main() -> datafusion::error::Result<()> {
   let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
 
   // execute and print results
-  let results: Vec<RecordBatch> = df.collect().await?;
-  print_batches(&results)?;
+  df.show().await?;
   Ok(())
 }
 ```
@@ -56,12 +55,10 @@ async fn main() -> datafusion::error::Result<()> {
   let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
 
   let df = df.filter(col("a").lt_eq(col("b")))?
-           .aggregate(vec![col("a")], vec![min(col("b"))])?
-           .limit(100)?;
+           .aggregate(vec![col("a")], vec![min(col("b"))])?;
 
   // execute and print results
-  let results: Vec<RecordBatch> = df.collect().await?;
-  print_batches(&results)?;
+  df.show_limit(100).await?;
   Ok(())
 }
 ```
diff --git a/pre-commit.sh b/pre-commit.sh
index 5ce0807..f82390e 100755
--- a/pre-commit.sh
+++ b/pre-commit.sh
@@ -20,7 +20,7 @@
 # This file is git pre-commit hook.
 #
 # Soft link it as git hook under top dir of apache arrow git repository:
-# $ ln -s  ../../rust/pre-commit.sh .git/hooks/pre-commit
+# $ ln -s  ../../pre-commit.sh .git/hooks/pre-commit
 #
 # This file be run directly:
 # $ ./pre-commit.sh
@@ -37,14 +37,12 @@ function BYELLOW() {
 	echo "\033[1;33m$@\033[0m"
 }
 
-RUST_DIR="rust"
-
 # env GIT_DIR is set by git when run a pre-commit hook.
 if [ -z "${GIT_DIR}" ]; then
 	GIT_DIR=$(git rev-parse --show-toplevel)
 fi
 
-cd ${GIT_DIR}/${RUST_DIR}
+cd ${GIT_DIR}
 
 NUM_CHANGES=$(git diff --cached --name-only . |
 	grep -e ".*/*.rs$" |