You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/07/27 18:31:28 UTC
[arrow-datafusion] branch master updated: Add Ballista examples
(#775)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c74136d Add Ballista examples (#775)
c74136d is described below
commit c74136d5649ad4446c98c15052ad4fffb6bb5dfc
Author: Andy Grove <ag...@apache.org>
AuthorDate: Tue Jul 27 11:31:20 2021 -0700
Add Ballista examples (#775)
---
Cargo.toml | 1 +
Cargo.toml => ballista-examples/Cargo.toml | 32 ++++++++-----
ballista-examples/README.md | 58 +++++++++++++++++++++++
ballista-examples/src/bin/ballista-dataframe.rs | 56 ++++++++++++++++++++++
ballista-examples/src/bin/ballista-sql.rs | 63 +++++++++++++++++++++++++
ballista/README.md | 42 ++++++++++-------
6 files changed, 224 insertions(+), 28 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 351523d..d6da8c1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,6 +25,7 @@ members = [
"ballista/rust/core",
"ballista/rust/executor",
"ballista/rust/scheduler",
+ "ballista-examples",
]
exclude = ["python"]
diff --git a/Cargo.toml b/ballista-examples/Cargo.toml
similarity index 54%
copy from Cargo.toml
copy to ballista-examples/Cargo.toml
index 351523d..b7d4022 100644
--- a/Cargo.toml
+++ b/ballista-examples/Cargo.toml
@@ -15,16 +15,24 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-members = [
- "datafusion",
- "datafusion-cli",
- "datafusion-examples",
- "benchmarks",
- "ballista/rust/client",
- "ballista/rust/core",
- "ballista/rust/executor",
- "ballista/rust/scheduler",
-]
+[package]
+name = "ballista-examples"
+description = "Ballista usage examples"
+version = "0.5.0-SNAPSHOT"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "distributed", "query", "sql" ]
+edition = "2018"
+publish = false
-exclude = ["python"]
+[dependencies]
+arrow-flight = { version = "5.0" }
+datafusion = { path = "../datafusion" }
+ballista = { path = "../ballista/rust/client" }
+prost = "0.7"
+tonic = "0.4"
+tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
+futures = "0.3"
+num_cpus = "1.13.0"
diff --git a/ballista-examples/README.md b/ballista-examples/README.md
new file mode 100644
index 0000000..1364ad4
--- /dev/null
+++ b/ballista-examples/README.md
@@ -0,0 +1,58 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Ballista Examples
+
+This directory contains examples for executing distributed queries with Ballista.
+
+For background information on the Ballista architecture, refer to
+the [Ballista README](../ballista/README.md).
+
+## Start a standalone cluster
+
+From the root of the arrow-datafusion project, build release binaries.
+
+```bash
+cargo build --release
+```
+
+Start a Ballista scheduler process in a new terminal session.
+
+```bash
+RUST_LOG=info ./target/release/ballista-scheduler
+```
+
+Start one or more Ballista executor processes in new terminal sessions. When starting more than one
+executor, a unique port number must be specified for each executor.
+
+```bash
+RUST_LOG=info ./target/release/ballista-executor -c 4
+```
+
+## Running the examples
+
+Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and
+`PARQUET_TEST_DATA` environment variables so that the examples can find the test data files.
+
+The examples can be run using the `cargo run --bin` syntax.
+
+```bash
+cargo run --release --bin ballista-dataframe
+```
+
diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs
new file mode 100644
index 0000000..da7d99d
--- /dev/null
+++ b/ballista-examples/src/bin/ballista-dataframe.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista::prelude::*;
+use datafusion::arrow::util::pretty;
+use datafusion::prelude::{col, lit};
+
+/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
+/// fetching results, using the DataFrame trait
+#[tokio::main]
+async fn main() -> Result<()> {
+ let config = BallistaConfig::builder()
+ .set("ballista.shuffle.partitions", "4")
+ .build()?;
+ let ctx = BallistaContext::remote("localhost", 50050, &config);
+
+ let testdata = datafusion::arrow::util::test_util::parquet_test_data();
+
+ let filename = &format!("{}/alltypes_plain.parquet", testdata);
+
+ // define the query using the DataFrame trait
+ let df = ctx
+ .read_parquet(filename)?
+ .select_columns(&["id", "bool_col", "timestamp_col"])?
+ .filter(col("id").gt(lit(1)))?;
+
+ // execute the query - note that calling collect on the DataFrame
+ // trait will execute the query with DataFusion so we have to call
+ // collect on the BallistaContext instead and pass it the DataFusion
+ // logical plan
+ let mut stream = ctx.collect(&df.to_logical_plan()).await?;
+
+ // print the results
+ let mut results = vec![];
+ while let Some(batch) = stream.next().await {
+ let batch = batch?;
+ results.push(batch);
+ }
+ pretty::print_batches(&results)?;
+
+ Ok(())
+}
diff --git a/ballista-examples/src/bin/ballista-sql.rs b/ballista-examples/src/bin/ballista-sql.rs
new file mode 100644
index 0000000..f9e7d18
--- /dev/null
+++ b/ballista-examples/src/bin/ballista-sql.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista::prelude::*;
+use datafusion::arrow::util::pretty;
+use datafusion::prelude::CsvReadOptions;
+
+/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
+/// fetching results, using SQL
+#[tokio::main]
+async fn main() -> Result<()> {
+ let config = BallistaConfig::builder()
+ .set("ballista.shuffle.partitions", "4")
+ .build()?;
+ let ctx = BallistaContext::remote("localhost", 50050, &config);
+
+ let testdata = datafusion::arrow::util::test_util::arrow_test_data();
+
+ // register csv file with the execution context
+ ctx.register_csv(
+ "aggregate_test_100",
+ &format!("{}/csv/aggregate_test_100.csv", testdata),
+ CsvReadOptions::new(),
+ )?;
+
+ // execute the query
+ let df = ctx.sql(
+ "SELECT c1, MIN(c12), MAX(c12) \
+ FROM aggregate_test_100 \
+ WHERE c11 > 0.1 AND c11 < 0.9 \
+ GROUP BY c1",
+ )?;
+
+ // execute the query - note that calling collect on the DataFrame
+ // trait will execute the query with DataFusion so we have to call
+ // collect on the BallistaContext instead and pass it the DataFusion
+ // logical plan
+ let mut stream = ctx.collect(&df.to_logical_plan()).await?;
+
+ // print the results
+ let mut results = vec![];
+ while let Some(batch) = stream.next().await {
+ let batch = batch?;
+ results.push(batch);
+ }
+ pretty::print_batches(&results)?;
+
+ Ok(())
+}
diff --git a/ballista/README.md b/ballista/README.md
index 038146a..0a8db63 100644
--- a/ballista/README.md
+++ b/ballista/README.md
@@ -17,11 +17,11 @@
under the License.
-->
-# Ballista: Distributed Compute with Apache Arrow
+# Ballista: Distributed Compute with Apache Arrow and DataFusion
-Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow. It is built
-on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as
-first-class citizens without paying a penalty for serialization costs.
+Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and
+DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and
+Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
@@ -35,9 +35,30 @@ Ballista can be deployed as a standalone cluster and also supports [Kubernetes](
case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide
redundancy in the case of a scheduler failing.
+# Getting Started
+
+Fully working examples are available. Refer to the [Ballista Examples README](../ballista-examples/README.md) for
+more information.
+
+## Distributed Scheduler Overview
+
+Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a
+distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes.
+
+Specifically, any `RepartitionExec` operator is replaced with an `UnresolvedShuffleExec` and the child operator
+of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution.
+
+Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators
+and each task represents one *input* partition that will be executed. The resulting batches are repartitioned
+according to the shuffle partitioning scheme and each *output* partition is streamed to disk in Arrow IPC format.
+
+The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle
+tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight
+interface, and streams the shuffle IPC files.
+
# How does this compare to Apache Spark?
-Although Ballista is largely inspired by Apache Spark, there are some key differences.
+Ballista implements a similar design to Apache Spark, but there are some key differences.
- The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of
GC pauses.
@@ -49,14 +70,3 @@ Although Ballista is largely inspired by Apache Spark, there are some key differ
distributed compute.
- The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors
in any programming language with minimal serialization overhead.
-
-## Status
-
-Ballista was [donated](https://arrow.apache.org/blog/2021/04/12/ballista-donation/) to the Apache Arrow project in
-April 2021 and should be considered experimental.
-
-## Getting Started
-
-The [Ballista Developer Documentation](docs/README.md) and the
-[DataFusion User Guide](https://github.com/apache/arrow-datafusion/tree/master/docs/user-guide) are currently the
-best sources of information for getting started with Ballista.