You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/07/27 18:31:28 UTC

[arrow-datafusion] branch master updated: Add Ballista examples (#775)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c74136d  Add Ballista examples (#775)
c74136d is described below

commit c74136d5649ad4446c98c15052ad4fffb6bb5dfc
Author: Andy Grove <ag...@apache.org>
AuthorDate: Tue Jul 27 11:31:20 2021 -0700

    Add Ballista examples (#775)
---
 Cargo.toml                                      |  1 +
 Cargo.toml => ballista-examples/Cargo.toml      | 32 ++++++++-----
 ballista-examples/README.md                     | 58 +++++++++++++++++++++++
 ballista-examples/src/bin/ballista-dataframe.rs | 56 ++++++++++++++++++++++
 ballista-examples/src/bin/ballista-sql.rs       | 63 +++++++++++++++++++++++++
 ballista/README.md                              | 42 ++++++++++-------
 6 files changed, 224 insertions(+), 28 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 351523d..d6da8c1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,6 +25,7 @@ members = [
     "ballista/rust/core",
     "ballista/rust/executor",
     "ballista/rust/scheduler",
+    "ballista-examples",
 ]
 
 exclude = ["python"]
diff --git a/Cargo.toml b/ballista-examples/Cargo.toml
similarity index 54%
copy from Cargo.toml
copy to ballista-examples/Cargo.toml
index 351523d..b7d4022 100644
--- a/Cargo.toml
+++ b/ballista-examples/Cargo.toml
@@ -15,16 +15,24 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = [
-    "datafusion",
-    "datafusion-cli",
-    "datafusion-examples",
-    "benchmarks",
-    "ballista/rust/client",
-    "ballista/rust/core",
-    "ballista/rust/executor",
-    "ballista/rust/scheduler",
-]
+[package]
+name = "ballista-examples"
+description = "Ballista usage examples"
+version = "0.5.0-SNAPSHOT"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "distributed", "query", "sql" ]
+edition = "2018"
+publish = false
 
-exclude = ["python"]
+[dependencies]
+arrow-flight = { version = "5.0" }
+datafusion = { path = "../datafusion" }
+ballista = { path = "../ballista/rust/client" }
+prost = "0.7"
+tonic = "0.4"
+tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
+futures = "0.3"
+num_cpus = "1.13.0"
diff --git a/ballista-examples/README.md b/ballista-examples/README.md
new file mode 100644
index 0000000..1364ad4
--- /dev/null
+++ b/ballista-examples/README.md
@@ -0,0 +1,58 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Ballista Examples
+
+This directory contains examples for executing distributed queries with Ballista.
+
+For background information on the Ballista architecture, refer to 
+the [Ballista README](../ballista/README.md).
+
+## Start a standalone cluster
+
+From the root of the arrow-datafusion project, build release binaries.
+
+```bash
+cargo build --release
+```
+
+Start a Ballista scheduler process in a new terminal session.
+
+```bash
+RUST_LOG=info ./target/release/ballista-scheduler
+```
+
+Start one or more Ballista executor processes in new terminal sessions. When starting more than one 
+executor, a unique port number must be specified for each executor.
+
+```bash
+RUST_LOG=info ./target/release/ballista-executor -c 4
+```
+
+## Running the examples
+
+Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and
+`PARQUET_TEST_DATA` environment variables so that the examples can find the test data files.
+
+The examples can be run using the `cargo run --bin` syntax. 
+
+```bash
+cargo run --release --bin ballista-dataframe
+```
+
diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs
new file mode 100644
index 0000000..da7d99d
--- /dev/null
+++ b/ballista-examples/src/bin/ballista-dataframe.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista::prelude::*;
+use datafusion::arrow::util::pretty;
+use datafusion::prelude::{col, lit};
+
+/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
+/// fetching results, using the DataFrame trait
+#[tokio::main]
+async fn main() -> Result<()> {
+    let config = BallistaConfig::builder()
+        .set("ballista.shuffle.partitions", "4")
+        .build()?;
+    let ctx = BallistaContext::remote("localhost", 50050, &config);
+
+    let testdata = datafusion::arrow::util::test_util::parquet_test_data();
+
+    let filename = &format!("{}/alltypes_plain.parquet", testdata);
+
+    // define the query using the DataFrame trait
+    let df = ctx
+        .read_parquet(filename)?
+        .select_columns(&["id", "bool_col", "timestamp_col"])?
+        .filter(col("id").gt(lit(1)))?;
+
+    // execute the query - note that calling collect on the DataFrame
+    // trait will execute the query with DataFusion so we have to call
+    // collect on the BallistaContext instead and pass it the DataFusion
+    // logical plan
+    let mut stream = ctx.collect(&df.to_logical_plan()).await?;
+
+    // print the results
+    let mut results = vec![];
+    while let Some(batch) = stream.next().await {
+        let batch = batch?;
+        results.push(batch);
+    }
+    pretty::print_batches(&results)?;
+
+    Ok(())
+}
diff --git a/ballista-examples/src/bin/ballista-sql.rs b/ballista-examples/src/bin/ballista-sql.rs
new file mode 100644
index 0000000..f9e7d18
--- /dev/null
+++ b/ballista-examples/src/bin/ballista-sql.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista::prelude::*;
+use datafusion::arrow::util::pretty;
+use datafusion::prelude::CsvReadOptions;
+
+/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
+/// fetching results, using SQL
+#[tokio::main]
+async fn main() -> Result<()> {
+    let config = BallistaConfig::builder()
+        .set("ballista.shuffle.partitions", "4")
+        .build()?;
+    let ctx = BallistaContext::remote("localhost", 50050, &config);
+
+    let testdata = datafusion::arrow::util::test_util::arrow_test_data();
+
+    // register csv file with the execution context
+    ctx.register_csv(
+        "aggregate_test_100",
+        &format!("{}/csv/aggregate_test_100.csv", testdata),
+        CsvReadOptions::new(),
+    )?;
+
+    // execute the query
+    let df = ctx.sql(
+        "SELECT c1, MIN(c12), MAX(c12) \
+        FROM aggregate_test_100 \
+        WHERE c11 > 0.1 AND c11 < 0.9 \
+        GROUP BY c1",
+    )?;
+
+    // execute the query - note that calling collect on the DataFrame
+    // trait will execute the query with DataFusion so we have to call
+    // collect on the BallistaContext instead and pass it the DataFusion
+    // logical plan
+    let mut stream = ctx.collect(&df.to_logical_plan()).await?;
+
+    // print the results
+    let mut results = vec![];
+    while let Some(batch) = stream.next().await {
+        let batch = batch?;
+        results.push(batch);
+    }
+    pretty::print_batches(&results)?;
+
+    Ok(())
+}
diff --git a/ballista/README.md b/ballista/README.md
index 038146a..0a8db63 100644
--- a/ballista/README.md
+++ b/ballista/README.md
@@ -17,11 +17,11 @@
   under the License.
 -->
 
-# Ballista: Distributed Compute with Apache Arrow
+# Ballista: Distributed Compute with Apache Arrow and DataFusion
 
-Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow. It is built
-on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as
-first-class citizens without paying a penalty for serialization costs.
+Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and 
+DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and 
+Java) to be supported as first-class citizens without paying a penalty for serialization costs.
 
 The foundational technologies in Ballista are:
 
@@ -35,9 +35,30 @@ Ballista can be deployed as a standalone cluster and also supports [Kubernetes](
 case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide
 redundancy in the case of a scheduler failing.
 
+# Getting Started
+
+Fully working examples are available. Refer to the [Ballista Examples README](../ballista-examples/README.md) for 
+more information.
+
+## Distributed Scheduler Overview
+
+Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a 
+distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes.
+
+Specifically, any `RepartitionExec` operator is replaced with an `UnresolvedShuffleExec` and the child operator 
+of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution.
+
+Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators 
+and each task represents one *input* partition that will be executed. The resulting batches are repartitioned 
+according to the shuffle partitioning scheme and each *output* partition is streamed to disk in Arrow IPC format.
+
+The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle 
+tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight 
+interface, and streams the shuffle IPC files.
+
 # How does this compare to Apache Spark?
 
-Although Ballista is largely inspired by Apache Spark, there are some key differences.
+Ballista implements a similar design to Apache Spark, but there are some key differences.
 
 - The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of
   GC pauses.
@@ -49,14 +70,3 @@ Although Ballista is largely inspired by Apache Spark, there are some key differ
   distributed compute.
 - The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors
   in any programming language with minimal serialization overhead.
-
-## Status
-
-Ballista was [donated](https://arrow.apache.org/blog/2021/04/12/ballista-donation/) to the Apache Arrow project in
-April 2021 and should be considered experimental.
-
-## Getting Started
-
-The [Ballista Developer Documentation](docs/README.md) and the
-[DataFusion User Guide](https://github.com/apache/arrow-datafusion/tree/master/docs/user-guide) are currently the
-best sources of information for getting started with Ballista.