You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/24 12:43:34 UTC
[arrow-datafusion] branch master updated: [DataFusion] - Add show
and show_limit function for DataFrame (#923)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5871207 [DataFusion] - Add show and show_limit function for DataFrame (#923)
5871207 is described below
commit 58712078391502b740cf1f92f960edf4d1721187
Author: Francis Du <fr...@francisdu.com>
AuthorDate: Tue Aug 24 20:43:27 2021 +0800
[DataFusion] - Add show and show_limit function for DataFrame (#923)
* feat: support show function for DataFrame
* fix: fix docs comments
* fix: fix typo
* fix: fix pre-commit
* fix: fix code format
* fix: improve show function implementation
* fix: change match pattern to 'if let' single pattern
* fix: Rewrite show function impl and add a new show_limit function
* fix: Add the show function to the sample code
* fix: fix cargo test error
---
README.md | 9 +++----
ballista-examples/src/bin/ballista-dataframe.rs | 5 ++--
ballista-examples/src/bin/ballista-sql.rs | 5 ++--
datafusion-examples/examples/csv_sql.rs | 5 +---
datafusion-examples/examples/dataframe.rs | 7 +----
.../examples/dataframe_in_memory.rs | 6 +----
datafusion-examples/examples/flight_client.rs | 2 +-
datafusion-examples/examples/parquet_sql.rs | 5 +---
datafusion-examples/examples/simple_udf.rs | 6 +----
datafusion/src/dataframe.rs | 30 ++++++++++++++++++++++
datafusion/src/execution/dataframe_impl.rs | 13 ++++++++++
docs/user-guide/src/example-usage.md | 9 +++----
pre-commit.sh | 6 ++---
13 files changed, 61 insertions(+), 47 deletions(-)
diff --git a/README.md b/README.md
index 0b1b679..ed2788c 100644
--- a/README.md
+++ b/README.md
@@ -82,8 +82,7 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
// execute and print results
- let results: Vec<RecordBatch> = df.collect().await?;
- print_batches(&results)?;
+ df.show().await?;
Ok(())
}
```
@@ -102,12 +101,10 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
let df = df.filter(col("a").lt_eq(col("b")))?
- .aggregate(vec![col("a")], vec![min(col("b"))])?
- .limit(100)?;
+ .aggregate(vec![col("a")], vec![min(col("b"))])?;
// execute and print results
- let results: Vec<RecordBatch> = df.collect().await?;
- print_batches(&results)?;
+ df.show_limit(100).await?;
Ok(())
}
```
diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs
index 693e676..434ed7b 100644
--- a/ballista-examples/src/bin/ballista-dataframe.rs
+++ b/ballista-examples/src/bin/ballista-dataframe.rs
@@ -16,7 +16,6 @@
// under the License.
use ballista::prelude::*;
-use datafusion::arrow::util::pretty;
use datafusion::prelude::{col, lit};
/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
@@ -38,8 +37,8 @@ async fn main() -> Result<()> {
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
- let results = df.collect().await?;
- pretty::print_batches(&results)?;
+ // print the results
+ df.show().await?;
Ok(())
}
diff --git a/ballista-examples/src/bin/ballista-sql.rs b/ballista-examples/src/bin/ballista-sql.rs
index 590ab7b..4b303e3 100644
--- a/ballista-examples/src/bin/ballista-sql.rs
+++ b/ballista-examples/src/bin/ballista-sql.rs
@@ -16,7 +16,6 @@
// under the License.
use ballista::prelude::*;
-use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;
/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
@@ -45,8 +44,8 @@ async fn main() -> Result<()> {
GROUP BY c1",
)?;
- let results = df.collect().await?;
- pretty::print_batches(&results)?;
+ // print the results
+ df.show().await?;
Ok(())
}
diff --git a/datafusion-examples/examples/csv_sql.rs b/datafusion-examples/examples/csv_sql.rs
index a06b42a..a1cdf5d 100644
--- a/datafusion-examples/examples/csv_sql.rs
+++ b/datafusion-examples/examples/csv_sql.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::arrow::util::pretty;
-
use datafusion::error::Result;
use datafusion::prelude::*;
@@ -43,10 +41,9 @@ async fn main() -> Result<()> {
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;
- let results = df.collect().await?;
// print the results
- pretty::print_batches(&results)?;
+ df.show().await?;
Ok(())
}
diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs
index dcf6bc3..013f322 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::arrow::util::pretty;
-
use datafusion::error::Result;
use datafusion::prelude::*;
@@ -37,11 +35,8 @@ async fn main() -> Result<()> {
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
- // execute the query
- let results = df.collect().await?;
-
// print the results
- pretty::print_batches(&results)?;
+ df.show().await?;
Ok(())
}
diff --git a/datafusion-examples/examples/dataframe_in_memory.rs b/datafusion-examples/examples/dataframe_in_memory.rs
index 0c65a74..27ac079 100644
--- a/datafusion-examples/examples/dataframe_in_memory.rs
+++ b/datafusion-examples/examples/dataframe_in_memory.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::arrow::util::pretty;
use datafusion::datasource::MemTable;
use datafusion::error::Result;
@@ -57,11 +56,8 @@ async fn main() -> Result<()> {
let df = df.select_columns(&["a", "b"])?.filter(filter)?;
- // execute
- let results = df.collect().await?;
-
// print the results
- pretty::print_batches(&results)?;
+ df.show().await?;
Ok(())
}
diff --git a/datafusion-examples/examples/flight_client.rs b/datafusion-examples/examples/flight_client.rs
index 5334782..6fc8014 100644
--- a/datafusion-examples/examples/flight_client.rs
+++ b/datafusion-examples/examples/flight_client.rs
@@ -19,12 +19,12 @@ use std::convert::TryFrom;
use std::sync::Arc;
use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::util::pretty;
use arrow_flight::flight_descriptor;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightDescriptor, Ticket};
+use datafusion::arrow::util::pretty;
/// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for
/// Parquet files and executing SQL queries against them on a remote server.
diff --git a/datafusion-examples/examples/parquet_sql.rs b/datafusion-examples/examples/parquet_sql.rs
index f679b22..2f3ce91 100644
--- a/datafusion-examples/examples/parquet_sql.rs
+++ b/datafusion-examples/examples/parquet_sql.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::arrow::util::pretty;
-
use datafusion::error::Result;
use datafusion::prelude::*;
@@ -41,10 +39,9 @@ async fn main() -> Result<()> {
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
)?;
- let results = df.collect().await?;
// print the results
- pretty::print_batches(&results)?;
+ df.show().await?;
Ok(())
}
diff --git a/datafusion-examples/examples/simple_udf.rs b/datafusion-examples/examples/simple_udf.rs
index 0ffec44..f71b9eb 100644
--- a/datafusion-examples/examples/simple_udf.rs
+++ b/datafusion-examples/examples/simple_udf.rs
@@ -19,7 +19,6 @@ use datafusion::arrow::{
array::{ArrayRef, Float32Array, Float64Array},
datatypes::DataType,
record_batch::RecordBatch,
- util::pretty,
};
use datafusion::prelude::*;
@@ -141,11 +140,8 @@ async fn main() -> Result<()> {
// note that "b" is f32, not f64. DataFusion coerces the types to match the UDF's signature.
- // execute the query
- let results = df.collect().await?;
-
// print the results
- pretty::print_batches(&results)?;
+ df.show().await?;
Ok(())
}
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 608f6db..da64ffa 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -223,6 +223,36 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect(&self) -> Result<Vec<RecordBatch>>;
+ /// Print results.
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let mut ctx = ExecutionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// df.show().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ async fn show(&self) -> Result<()>;
+
+ /// Print results and limit rows.
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let mut ctx = ExecutionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// df.show_limit(10).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ async fn show_limit(&self, n: usize) -> Result<()>;
+
/// Executes this DataFrame and returns a stream over a single partition
///
/// ```
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index ddaa04e..5e1a4f4 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -31,6 +31,7 @@ use crate::{
physical_plan::{collect, collect_partitioned},
};
+use crate::arrow::util::pretty;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
@@ -156,6 +157,18 @@ impl DataFrame for DataFrameImpl {
Ok(collect(plan).await?)
}
+ /// Print results.
+ async fn show(&self) -> Result<()> {
+ let results = self.collect().await?;
+ Ok(pretty::print_batches(&results)?)
+ }
+
+ /// Print results and limit rows.
+ async fn show_limit(&self, num: usize) -> Result<()> {
+ let results = self.limit(num)?.collect().await?;
+ Ok(pretty::print_batches(&results)?)
+ }
+
/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, returning a stream over a single partition
async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
diff --git a/docs/user-guide/src/example-usage.md b/docs/user-guide/src/example-usage.md
index 2ea7dca..4280079 100644
--- a/docs/user-guide/src/example-usage.md
+++ b/docs/user-guide/src/example-usage.md
@@ -36,8 +36,7 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
// execute and print results
- let results: Vec<RecordBatch> = df.collect().await?;
- print_batches(&results)?;
+ df.show().await?;
Ok(())
}
```
@@ -56,12 +55,10 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
let df = df.filter(col("a").lt_eq(col("b")))?
- .aggregate(vec![col("a")], vec![min(col("b"))])?
- .limit(100)?;
+ .aggregate(vec![col("a")], vec![min(col("b"))])?;
// execute and print results
- let results: Vec<RecordBatch> = df.collect().await?;
- print_batches(&results)?;
+ df.show_limit(100).await?;
Ok(())
}
```
diff --git a/pre-commit.sh b/pre-commit.sh
index 5ce0807..f82390e 100755
--- a/pre-commit.sh
+++ b/pre-commit.sh
@@ -20,7 +20,7 @@
# This file is git pre-commit hook.
#
# Soft link it as git hook under top dir of apache arrow git repository:
-# $ ln -s ../../rust/pre-commit.sh .git/hooks/pre-commit
+# $ ln -s ../../pre-commit.sh .git/hooks/pre-commit
#
# This file be run directly:
# $ ./pre-commit.sh
@@ -37,14 +37,12 @@ function BYELLOW() {
echo "\033[1;33m$@\033[0m"
}
-RUST_DIR="rust"
-
# env GIT_DIR is set by git when run a pre-commit hook.
if [ -z "${GIT_DIR}" ]; then
GIT_DIR=$(git rev-parse --show-toplevel)
fi
-cd ${GIT_DIR}/${RUST_DIR}
+cd ${GIT_DIR}
NUM_CHANGES=$(git diff --cached --name-only . |
grep -e ".*/*.rs$" |