You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/05/25 12:25:12 UTC

[arrow-datafusion] branch master updated: Benchmark subcommand to distinguish between DataFusion and Ballista (#402)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ea59d05  Benchmark subcommand to distinguish between DataFusion and Ballista (#402)
ea59d05 is described below

commit ea59d05b6390a0f676956db9160805b3f660cb54
Author: Javier Goday <jg...@gmail.com>
AuthorDate: Tue May 25 14:25:04 2021 +0200

    Benchmark subcommand to distinguish between DataFusion and Ballista (#402)
    
    * #401: Add subcommand to TPC-H benchmark args to distinguish between DataFusion and Ballista
    
    * fix benchmark subcommand name
    
    * Fix lint
    
    * fix benchmark tests using DatafusionBenchmarkOpts
    
    * Fix DataFusionBenchmarkOpts name and update doc
---
 benchmarks/README.md                           |  8 +--
 benchmarks/run.sh                              |  2 +-
 benchmarks/src/bin/tpch.rs                     | 77 ++++++++++++++++++++------
 docs/user-guide/src/distributed/raspberrypi.md |  2 +-
 4 files changed, 66 insertions(+), 23 deletions(-)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index e003d96..e347130 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -44,13 +44,13 @@ to the `.gitignore` file.
 The benchmark can then be run (assuming the data created from `dbgen` is in `./data`) with a command such as:
 
 ```bash
-cargo run --release --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
+cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
 ```
 
 You can enable the features `simd` (to use SIMD instructions) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`:
 
 ```
-cargo run --release --features "simd mimalloc" --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
+cargo run --release --features "simd mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
 ```
 
 The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl`
@@ -123,7 +123,7 @@ To run the benchmarks:
 
 ```bash
 cd $ARROW_HOME/ballista/rust/benchmarks/tpch
-cargo run --release benchmark --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl
+cargo run --release benchmark ballista --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl
 ```
 
 ## Running the Ballista Benchmarks on docker-compose
@@ -140,7 +140,7 @@ docker-compose up
 Then you can run the benchmark with:
 
 ```bash
-docker-compose run ballista-client cargo run benchmark --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl
+docker-compose run ballista-client cargo run benchmark ballista --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl
 ```
 
 ## Expected output
diff --git a/benchmarks/run.sh b/benchmarks/run.sh
index fd97ff9..8e36424 100755
--- a/benchmarks/run.sh
+++ b/benchmarks/run.sh
@@ -22,5 +22,5 @@ set -e
 cd /
 for query in 1 3 5 6 10 12
 do
-  /tpch benchmark --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug
+  /tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug
 done
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 433bf2d..9ac66e1 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -54,7 +54,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 #[derive(Debug, StructOpt, Clone)]
-struct BenchmarkOpt {
+struct BallistaBenchmarkOpt {
     /// Query number
     #[structopt(short, long)]
     query: usize,
@@ -67,10 +67,6 @@ struct BenchmarkOpt {
     #[structopt(short = "i", long = "iterations", default_value = "3")]
     iterations: usize,
 
-    /// Number of threads to use for parallel execution
-    #[structopt(short = "c", long = "concurrency", default_value = "2")]
-    concurrency: usize,
-
     /// Batch size when reading CSV or Parquet files
     #[structopt(short = "s", long = "batch-size", default_value = "8192")]
     batch_size: usize,
@@ -100,6 +96,45 @@ struct BenchmarkOpt {
     port: Option<u16>,
 }
 
+#[derive(Debug, StructOpt, Clone)]
+struct DataFusionBenchmarkOpt {
+    /// Query number
+    #[structopt(short, long)]
+    query: usize,
+
+    /// Activate debug mode to see query results
+    #[structopt(short, long)]
+    debug: bool,
+
+    /// Number of iterations of each test run
+    #[structopt(short = "i", long = "iterations", default_value = "3")]
+    iterations: usize,
+
+    /// Number of threads to use for parallel execution
+    #[structopt(short = "c", long = "concurrency", default_value = "2")]
+    concurrency: usize,
+
+    /// Batch size when reading CSV or Parquet files
+    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+    batch_size: usize,
+
+    /// Path to data files
+    #[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
+    path: PathBuf,
+
+    /// File format: `csv` or `parquet`
+    #[structopt(short = "f", long = "format", default_value = "csv")]
+    file_format: String,
+
+    /// Load the data into a MemTable before executing the query
+    #[structopt(short = "m", long = "mem-table")]
+    mem_table: bool,
+
+    /// Number of partitions to create when using MemTable as input
+    #[structopt(short = "n", long = "partitions", default_value = "8")]
+    partitions: usize,
+}
+
 #[derive(Debug, StructOpt)]
 struct ConvertOpt {
     /// Path to csv files
@@ -128,9 +163,18 @@ struct ConvertOpt {
 }
 
 #[derive(Debug, StructOpt)]
+#[structopt(about = "benchmark command")]
+enum BenchmarkSubCommandOpt {
+    #[structopt(name = "ballista")]
+    BallistaBenchmark(BallistaBenchmarkOpt),
+    #[structopt(name = "datafusion")]
+    DataFusionBenchmark(DataFusionBenchmarkOpt),
+}
+
+#[derive(Debug, StructOpt)]
 #[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
 enum TpchOpt {
-    Benchmark(BenchmarkOpt),
+    Benchmark(BenchmarkSubCommandOpt),
     Convert(ConvertOpt),
 }
 
@@ -140,20 +184,21 @@ const TABLES: &[&str] = &[
 
 #[tokio::main]
 async fn main() -> Result<()> {
+    use BenchmarkSubCommandOpt::*;
+
     env_logger::init();
     match TpchOpt::from_args() {
-        TpchOpt::Benchmark(opt) => {
-            if opt.host.is_some() && opt.port.is_some() {
-                benchmark_ballista(opt).await.map(|_| ())
-            } else {
-                benchmark_datafusion(opt).await.map(|_| ())
-            }
+        TpchOpt::Benchmark(BallistaBenchmark(opt)) => {
+            benchmark_ballista(opt).await.map(|_| ())
+        }
+        TpchOpt::Benchmark(DataFusionBenchmark(opt)) => {
+            benchmark_datafusion(opt).await.map(|_| ())
         }
         TpchOpt::Convert(opt) => convert_tbl(opt).await,
     }
 }
 
-async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result<Vec<RecordBatch>> {
+async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
     println!("Running benchmarks with the following options: {:?}", opt);
     let config = ExecutionConfig::new()
         .with_concurrency(opt.concurrency)
@@ -204,7 +249,7 @@ async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result<Vec<RecordBatch>> {
     Ok(result)
 }
 
-async fn benchmark_ballista(opt: BenchmarkOpt) -> Result<()> {
+async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
     println!("Running benchmarks with the following options: {:?}", opt);
 
     let mut settings = HashMap::new();
@@ -956,7 +1001,7 @@ mod tests {
             let expected = df.collect().await?;
 
             // run the query to compute actual results of the query
-            let opt = BenchmarkOpt {
+            let opt = DataFusionBenchmarkOpt {
                 query: n,
                 debug: false,
                 iterations: 1,
@@ -966,8 +1011,6 @@ mod tests {
                 file_format: "tbl".to_string(),
                 mem_table: false,
                 partitions: 16,
-                host: None,
-                port: None,
             };
             let actual = benchmark_datafusion(opt).await?;
 
diff --git a/docs/user-guide/src/distributed/raspberrypi.md b/docs/user-guide/src/distributed/raspberrypi.md
index c7e429a..0083d19 100644
--- a/docs/user-guide/src/distributed/raspberrypi.md
+++ b/docs/user-guide/src/distributed/raspberrypi.md
@@ -115,7 +115,7 @@ Run the benchmarks:
 
 ```bash
 docker run -it myrepo/ballista-arm64 \
-  /tpch benchmark --query=1 --path=/path/to/data --format=parquet \
+  /tpch benchmark datafusion --query=1 --path=/path/to/data --format=parquet \
   --concurrency=24 --iterations=1 --debug --host=ballista-scheduler --port=50050
 ```