You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/05 06:35:54 UTC
[arrow-datafusion] branch master updated: Implement metrics for
shuffle read and write (#676)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 58da159 Implement metrics for shuffle read and write (#676)
58da159 is described below
commit 58da15970dc0ec9e3c1c369fe89f6ba38e09d9c9
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon Jul 5 00:35:47 2021 -0600
Implement metrics for shuffle read and write (#676)
---
ballista/rust/core/Cargo.toml | 1 +
.../core/src/execution_plans/shuffle_reader.rs | 26 +++++++++++---
.../core/src/execution_plans/shuffle_writer.rs | 42 ++++++++++++++++++----
ballista/rust/core/src/serde/scheduler/mod.rs | 12 ++++++-
ballista/rust/core/src/utils.rs | 9 ++++-
ballista/rust/executor/src/executor.rs | 9 +++++
6 files changed, 87 insertions(+), 12 deletions(-)
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index bedc097..3a89c75 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -33,6 +33,7 @@ simd = ["datafusion/simd"]
ahash = "0.7"
async-trait = "0.1.36"
futures = "0.3"
+hashbrown = "0.11"
log = "0.4"
prost = "0.7"
serde = {version = "1", features = ["derive"]}
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 9ab0641..db03d3d 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -28,13 +28,17 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
+};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
+use hashbrown::HashMap;
use log::info;
+use std::time::Instant;
/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
/// being executed by an executor
@@ -43,6 +47,8 @@ pub struct ShuffleReaderExec {
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
+ /// Time to fetch data from executor
+ fetch_time: Arc<SQLMetric>,
}
impl ShuffleReaderExec {
@@ -51,7 +57,11 @@ impl ShuffleReaderExec {
partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
- Ok(Self { partition, schema })
+ Ok(Self {
+ partition,
+ schema,
+ fetch_time: SQLMetric::time_nanos(),
+ })
}
}
@@ -88,11 +98,13 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);
+ let start = Instant::now();
let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
+ self.fetch_time.add_elapsed(start);
let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
@@ -115,7 +127,7 @@ impl ExecutionPlan for ShuffleReaderExec {
x.iter()
.map(|l| {
format!(
- "[executor={} part={}:{}:{} stats={:?}]",
+ "[executor={} part={}:{}:{} stats={}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
@@ -127,11 +139,17 @@ impl ExecutionPlan for ShuffleReaderExec {
.join(",")
})
.collect::<Vec<String>>()
- .join("\n");
+ .join(", ");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
}
}
}
+
+ fn metrics(&self) -> HashMap<String, SQLMetric> {
+ let mut metrics = HashMap::new();
+ metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
+ metrics
+ }
}
async fn fetch_partition(
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 7fffaba..92b4448 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -20,6 +20,7 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.
+use std::fs::File;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
@@ -43,11 +44,11 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
};
use futures::StreamExt;
+use hashbrown::HashMap;
use log::info;
-use std::fs::File;
use uuid::Uuid;
/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
@@ -66,6 +67,22 @@ pub struct ShuffleWriterExec {
work_dir: String,
/// Optional shuffle output partitioning
shuffle_output_partitioning: Option<Partitioning>,
+ /// Shuffle write metrics
+ metrics: ShuffleWriteMetrics,
+}
+
+#[derive(Debug, Clone)]
+struct ShuffleWriteMetrics {
+ /// Time spend writing batches to shuffle files
+ write_time: Arc<SQLMetric>,
+}
+
+impl ShuffleWriteMetrics {
+ fn new() -> Self {
+ Self {
+ write_time: SQLMetric::time_nanos(),
+ }
+ }
}
impl ShuffleWriterExec {
@@ -83,6 +100,7 @@ impl ShuffleWriterExec {
plan,
work_dir,
shuffle_output_partitioning,
+ metrics: ShuffleWriteMetrics::new(),
})
}
@@ -150,12 +168,16 @@ impl ExecutionPlan for ShuffleWriterExec {
info!("Writing results to {}", path);
// stream results to disk
- let stats = utils::write_stream_to_disk(&mut stream, path)
- .await
- .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ let stats = utils::write_stream_to_disk(
+ &mut stream,
+ path,
+ self.metrics.write_time.clone(),
+ )
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
info!(
- "Executed partition {} in {} seconds. Statistics: {:?}",
+ "Executed partition {} in {} seconds. Statistics: {}",
partition,
now.elapsed().as_secs(),
stats
@@ -231,6 +253,7 @@ impl ExecutionPlan for ShuffleWriterExec {
RecordBatch::try_new(input_batch.schema(), columns)?;
// write batch out
+ let start = Instant::now();
match &mut writers[num_output_partition] {
Some(w) => {
w.write(&output_batch)?;
@@ -251,6 +274,7 @@ impl ExecutionPlan for ShuffleWriterExec {
writers[num_output_partition] = Some(writer);
}
}
+ self.metrics.write_time.add_elapsed(start);
}
}
@@ -310,6 +334,12 @@ impl ExecutionPlan for ShuffleWriterExec {
}
}
+ fn metrics(&self) -> HashMap<String, SQLMetric> {
+ let mut metrics = HashMap::new();
+ metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
+ metrics
+ }
+
fn fmt_as(
&self,
t: DisplayFormatType,
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs
index f66bb08..cbe1a31 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use std::{collections::HashMap, sync::Arc};
+use std::{collections::HashMap, fmt, sync::Arc};
use datafusion::arrow::array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
@@ -113,6 +113,16 @@ impl Default for PartitionStats {
}
}
+impl fmt::Display for PartitionStats {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "numBatches={:?}, numRows={:?}, numBytes={:?}",
+ self.num_batches, self.num_rows, self.num_bytes
+ )
+ }
+}
+
impl PartitionStats {
pub fn new(
num_rows: Option<u64>,
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 8a510f4..f7d884d 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -53,15 +53,17 @@ use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
- AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
+ AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
};
use futures::{future, Stream, StreamExt};
+use std::time::Instant;
/// Stream data to disk in Arrow IPC format
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
path: &str,
+ disk_write_metric: Arc<SQLMetric>,
) -> Result<PartitionStats> {
let file = File::create(&path).map_err(|e| {
BallistaError::General(format!(
@@ -86,9 +88,14 @@ pub async fn write_stream_to_disk(
num_batches += 1;
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;
+
+ let start = Instant::now();
writer.write(&batch)?;
+ disk_write_metric.add_elapsed(start);
}
+ let start = Instant::now();
writer.finish()?;
+ disk_write_metric.add_elapsed(start);
Ok(PartitionStats::new(
Some(num_rows as u64),
Some(num_batches),
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
index 86aaa7e..4a75448 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -23,6 +23,7 @@ use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::utils;
use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::ExecutionPlan;
/// Ballista executor
@@ -60,6 +61,14 @@ impl Executor {
)?;
let mut stream = exec.execute(part).await?;
let batches = utils::collect_stream(&mut stream).await?;
+
+ println!(
+ "=== Physical plan with metrics ===\n{}\n",
+ DisplayableExecutionPlan::with_metrics(&exec)
+ .indent()
+ .to_string()
+ );
+
// the output should be a single batch containing metadata (path and statistics)
assert!(batches.len() == 1);
Ok(batches[0].clone())