You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/15 15:38:33 UTC
[arrow-datafusion] branch master updated: Add ExecutorMetricsCollector interface (#2234)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9f2ed423d Add ExecutorMetricsCollector interface (#2234)
9f2ed423d is described below
commit 9f2ed423dc63f9f5d0a5e586925d2c31e3b9f5b8
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Fri Apr 15 11:38:28 2022 -0400
Add ExecutorMetricsCollector interface (#2234)
* Add ExecutorMetricsCollector interface
* Comments and license header
---
ballista/rust/executor/src/executor.rs | 17 ++++-----
ballista/rust/executor/src/lib.rs | 1 +
ballista/rust/executor/src/main.rs | 10 +++++-
ballista/rust/executor/src/metrics/mod.rs | 58 +++++++++++++++++++++++++++++++
ballista/rust/executor/src/standalone.rs | 2 ++
5 files changed, 79 insertions(+), 9 deletions(-)
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
index e64e30861..fa092137a 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -20,6 +20,7 @@
use std::collections::HashMap;
use std::sync::Arc;
+use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
@@ -27,7 +28,7 @@ use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
+
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
@@ -48,6 +49,9 @@ pub struct Executor {
/// Runtime environment for Executor
pub runtime: Arc<RuntimeEnv>,
+
+ /// Collector for runtime execution metrics
+ pub metrics_collector: Arc<dyn ExecutorMetricsCollector>,
}
impl Executor {
@@ -56,6 +60,7 @@ impl Executor {
metadata: ExecutorRegistration,
work_dir: &str,
runtime: Arc<RuntimeEnv>,
+ metrics_collector: Arc<dyn ExecutorMetricsCollector>,
) -> Self {
Self {
metadata,
@@ -64,6 +69,7 @@ impl Executor {
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
runtime,
+ metrics_collector,
}
}
}
@@ -101,13 +107,8 @@ impl Executor {
let partitions = exec.execute_shuffle_write(part, task_ctx).await?;
- println!(
- "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
- job_id,
- stage_id,
- part,
- DisplayableExecutionPlan::with_metrics(&exec).indent()
- );
+ self.metrics_collector
+ .record_stage(&job_id, stage_id, part, exec);
Ok(partitions)
}
diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs
index bb6dfa69b..4d145b269 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -22,6 +22,7 @@ pub mod execution_loop;
pub mod executor;
pub mod executor_server;
pub mod flight_service;
+pub mod metrics;
mod cpu_bound_executor;
mod standalone;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index 66d9bd71a..825ddd4d8 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -42,6 +42,7 @@ use ballista_core::serde::BallistaCodec;
use ballista_core::{print_version, BALLISTA_VERSION};
use ballista_executor::executor::Executor;
use ballista_executor::flight_service::BallistaFlightService;
+use ballista_executor::metrics::LoggingMetricsCollector;
use config::prelude::*;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
@@ -118,7 +119,14 @@ async fn main() -> Result<()> {
BallistaError::Internal("Failed to init Executor RuntimeEnv".to_owned())
})?);
- let executor = Arc::new(Executor::new(executor_meta, &work_dir, runtime));
+ let metrics_collector = Arc::new(LoggingMetricsCollector::default());
+
+ let executor = Arc::new(Executor::new(
+ executor_meta,
+ &work_dir,
+ runtime,
+ metrics_collector,
+ ));
let scheduler = SchedulerGrpcClient::connect(scheduler_url)
.await
diff --git a/ballista/rust/executor/src/metrics/mod.rs b/ballista/rust/executor/src/metrics/mod.rs
new file mode 100644
index 000000000..2c7e1d504
--- /dev/null
+++ b/ballista/rust/executor/src/metrics/mod.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista_core::execution_plans::ShuffleWriterExec;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+
+/// `ExecutorMetricsCollector` records metrics for `ShuffleWriteExec`
+/// after they are executed.
+///
+/// After each stage completes, `ShuffleWriteExec::record_stage` will be
+/// called.
+pub trait ExecutorMetricsCollector: Send + Sync {
+ /// Record metrics for stage after it is executed
+ fn record_stage(
+ &self,
+ job_id: &str,
+ stage_id: usize,
+ partition: usize,
+ plan: ShuffleWriterExec,
+ );
+}
+
+/// Implementation of `ExecutorMetricsCollector` which logs the completed
+/// plan to stdout.
+#[derive(Default)]
+pub struct LoggingMetricsCollector {}
+
+impl ExecutorMetricsCollector for LoggingMetricsCollector {
+ fn record_stage(
+ &self,
+ job_id: &str,
+ stage_id: usize,
+ partition: usize,
+ plan: ShuffleWriterExec,
+ ) {
+ println!(
+ "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
+ job_id,
+ stage_id,
+ partition,
+ DisplayableExecutionPlan::with_metrics(&plan).indent()
+ );
+ }
+}
diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs
index edbb857cc..d68052af8 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -34,6 +34,7 @@ use tokio::net::TcpListener;
use tonic::transport::{Channel, Server};
use uuid::Uuid;
+use crate::metrics::LoggingMetricsCollector;
use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
pub async fn new_standalone_executor<
@@ -78,6 +79,7 @@ pub async fn new_standalone_executor<
executor_meta,
&work_dir,
Arc::new(RuntimeEnv::new(config).unwrap()),
+ Arc::new(LoggingMetricsCollector::default()),
));
let service = BallistaFlightService::new(executor.clone());