You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/05 06:35:54 UTC

[arrow-datafusion] branch master updated: Implement metrics for shuffle read and write (#676)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 58da159  Implement metrics for shuffle read and write (#676)
58da159 is described below

commit 58da15970dc0ec9e3c1c369fe89f6ba38e09d9c9
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon Jul 5 00:35:47 2021 -0600

    Implement metrics for shuffle read and write (#676)
---
 ballista/rust/core/Cargo.toml                      |  1 +
 .../core/src/execution_plans/shuffle_reader.rs     | 26 +++++++++++---
 .../core/src/execution_plans/shuffle_writer.rs     | 42 ++++++++++++++++++----
 ballista/rust/core/src/serde/scheduler/mod.rs      | 12 ++++++-
 ballista/rust/core/src/utils.rs                    |  9 ++++-
 ballista/rust/executor/src/executor.rs             |  9 +++++
 6 files changed, 87 insertions(+), 12 deletions(-)

diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index bedc097..3a89c75 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -33,6 +33,7 @@ simd = ["datafusion/simd"]
 ahash = "0.7"
 async-trait = "0.1.36"
 futures = "0.3"
+hashbrown = "0.11"
 log = "0.4"
 prost = "0.7"
 serde = {version = "1", features = ["derive"]}
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 9ab0641..db03d3d 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -28,13 +28,17 @@ use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::Result as ArrowResult;
 use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
+};
 use datafusion::{
     error::{DataFusionError, Result},
     physical_plan::RecordBatchStream,
 };
 use futures::{future, Stream, StreamExt};
+use hashbrown::HashMap;
 use log::info;
+use std::time::Instant;
 
 /// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
 /// being executed by an executor
@@ -43,6 +47,8 @@ pub struct ShuffleReaderExec {
     /// Each partition of a shuffle can read data from multiple locations
     pub(crate) partition: Vec<Vec<PartitionLocation>>,
     pub(crate) schema: SchemaRef,
+    /// Time to fetch data from executor
+    fetch_time: Arc<SQLMetric>,
 }
 
 impl ShuffleReaderExec {
@@ -51,7 +57,11 @@ impl ShuffleReaderExec {
         partition: Vec<Vec<PartitionLocation>>,
         schema: SchemaRef,
     ) -> Result<Self> {
-        Ok(Self { partition, schema })
+        Ok(Self {
+            partition,
+            schema,
+            fetch_time: SQLMetric::time_nanos(),
+        })
     }
 }
 
@@ -88,11 +98,13 @@ impl ExecutionPlan for ShuffleReaderExec {
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
 
+        let start = Instant::now();
         let partition_locations = &self.partition[partition];
         let result = future::join_all(partition_locations.iter().map(fetch_partition))
             .await
             .into_iter()
             .collect::<Result<Vec<_>>>()?;
+        self.fetch_time.add_elapsed(start);
 
         let result = WrappedStream::new(
             Box::pin(futures::stream::iter(result).flatten()),
@@ -115,7 +127,7 @@ impl ExecutionPlan for ShuffleReaderExec {
                         x.iter()
                             .map(|l| {
                                 format!(
-                                    "[executor={} part={}:{}:{} stats={:?}]",
+                                    "[executor={} part={}:{}:{} stats={}]",
                                     l.executor_meta.id,
                                     l.partition_id.job_id,
                                     l.partition_id.stage_id,
@@ -127,11 +139,17 @@ impl ExecutionPlan for ShuffleReaderExec {
                             .join(",")
                     })
                     .collect::<Vec<String>>()
-                    .join("\n");
+                    .join(", ");
                 write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
             }
         }
     }
+
+    fn metrics(&self) -> HashMap<String, SQLMetric> {
+        let mut metrics = HashMap::new();
+        metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
+        metrics
+    }
 }
 
 async fn fetch_partition(
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 7fffaba..92b4448 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -20,6 +20,7 @@
 //! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
 //! will use the ShuffleReaderExec to read these results.
 
+use std::fs::File;
 use std::iter::Iterator;
 use std::path::PathBuf;
 use std::sync::{Arc, Mutex};
@@ -43,11 +44,11 @@ use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::physical_plan::hash_join::create_hashes;
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
 };
 use futures::StreamExt;
+use hashbrown::HashMap;
 use log::info;
-use std::fs::File;
 use uuid::Uuid;
 
 /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
@@ -66,6 +67,22 @@ pub struct ShuffleWriterExec {
     work_dir: String,
     /// Optional shuffle output partitioning
     shuffle_output_partitioning: Option<Partitioning>,
+    /// Shuffle write metrics
+    metrics: ShuffleWriteMetrics,
+}
+
+#[derive(Debug, Clone)]
+struct ShuffleWriteMetrics {
+    /// Time spend writing batches to shuffle files
+    write_time: Arc<SQLMetric>,
+}
+
+impl ShuffleWriteMetrics {
+    fn new() -> Self {
+        Self {
+            write_time: SQLMetric::time_nanos(),
+        }
+    }
 }
 
 impl ShuffleWriterExec {
@@ -83,6 +100,7 @@ impl ShuffleWriterExec {
             plan,
             work_dir,
             shuffle_output_partitioning,
+            metrics: ShuffleWriteMetrics::new(),
         })
     }
 
@@ -150,12 +168,16 @@ impl ExecutionPlan for ShuffleWriterExec {
                 info!("Writing results to {}", path);
 
                 // stream results to disk
-                let stats = utils::write_stream_to_disk(&mut stream, path)
-                    .await
-                    .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+                let stats = utils::write_stream_to_disk(
+                    &mut stream,
+                    path,
+                    self.metrics.write_time.clone(),
+                )
+                .await
+                .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
 
                 info!(
-                    "Executed partition {} in {} seconds. Statistics: {:?}",
+                    "Executed partition {} in {} seconds. Statistics: {}",
                     partition,
                     now.elapsed().as_secs(),
                     stats
@@ -231,6 +253,7 @@ impl ExecutionPlan for ShuffleWriterExec {
                             RecordBatch::try_new(input_batch.schema(), columns)?;
 
                         // write batch out
+                        let start = Instant::now();
                         match &mut writers[num_output_partition] {
                             Some(w) => {
                                 w.write(&output_batch)?;
@@ -251,6 +274,7 @@ impl ExecutionPlan for ShuffleWriterExec {
                                 writers[num_output_partition] = Some(writer);
                             }
                         }
+                        self.metrics.write_time.add_elapsed(start);
                     }
                 }
 
@@ -310,6 +334,12 @@ impl ExecutionPlan for ShuffleWriterExec {
         }
     }
 
+    fn metrics(&self) -> HashMap<String, SQLMetric> {
+        let mut metrics = HashMap::new();
+        metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
+        metrics
+    }
+
     fn fmt_as(
         &self,
         t: DisplayFormatType,
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs
index f66bb08..cbe1a31 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{collections::HashMap, sync::Arc};
+use std::{collections::HashMap, fmt, sync::Arc};
 
 use datafusion::arrow::array::{
     ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
@@ -113,6 +113,16 @@ impl Default for PartitionStats {
     }
 }
 
+impl fmt::Display for PartitionStats {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(
+            f,
+            "numBatches={:?}, numRows={:?}, numBytes={:?}",
+            self.num_batches, self.num_rows, self.num_bytes
+        )
+    }
+}
+
 impl PartitionStats {
     pub fn new(
         num_rows: Option<u64>,
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 8a510f4..f7d884d 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -53,15 +53,17 @@ use datafusion::physical_plan::parquet::ParquetExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sort::SortExec;
 use datafusion::physical_plan::{
-    AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
+    AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
 };
 use futures::{future, Stream, StreamExt};
+use std::time::Instant;
 
 /// Stream data to disk in Arrow IPC format
 
 pub async fn write_stream_to_disk(
     stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
     path: &str,
+    disk_write_metric: Arc<SQLMetric>,
 ) -> Result<PartitionStats> {
     let file = File::create(&path).map_err(|e| {
         BallistaError::General(format!(
@@ -86,9 +88,14 @@ pub async fn write_stream_to_disk(
         num_batches += 1;
         num_rows += batch.num_rows();
         num_bytes += batch_size_bytes;
+
+        let start = Instant::now();
         writer.write(&batch)?;
+        disk_write_metric.add_elapsed(start);
     }
+    let start = Instant::now();
     writer.finish()?;
+    disk_write_metric.add_elapsed(start);
     Ok(PartitionStats::new(
         Some(num_rows as u64),
         Some(num_batches),
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
index 86aaa7e..4a75448 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -23,6 +23,7 @@ use ballista_core::error::BallistaError;
 use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::utils;
 use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::ExecutionPlan;
 
 /// Ballista executor
@@ -60,6 +61,14 @@ impl Executor {
         )?;
         let mut stream = exec.execute(part).await?;
         let batches = utils::collect_stream(&mut stream).await?;
+
+        println!(
+            "=== Physical plan with metrics ===\n{}\n",
+            DisplayableExecutionPlan::with_metrics(&exec)
+                .indent()
+                .to_string()
+        );
+
         // the output should be a single batch containing metadata (path and statistics)
         assert!(batches.len() == 1);
         Ok(batches[0].clone())