You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/15 15:38:33 UTC

[arrow-datafusion] branch master updated: Add ExecutorMetricsCollector interface (#2234)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f2ed423d Add ExecutorMetricsCollector interface (#2234)
9f2ed423d is described below

commit 9f2ed423dc63f9f5d0a5e586925d2c31e3b9f5b8
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Fri Apr 15 11:38:28 2022 -0400

    Add ExecutorMetricsCollector interface (#2234)
    
    * Add ExecutorMetricsCollector interface
    
    * Comments and license header
---
 ballista/rust/executor/src/executor.rs    | 17 ++++-----
 ballista/rust/executor/src/lib.rs         |  1 +
 ballista/rust/executor/src/main.rs        | 10 +++++-
 ballista/rust/executor/src/metrics/mod.rs | 58 +++++++++++++++++++++++++++++++
 ballista/rust/executor/src/standalone.rs  |  2 ++
 5 files changed, 79 insertions(+), 9 deletions(-)

diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
index e64e30861..fa092137a 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -20,6 +20,7 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
+use crate::metrics::ExecutorMetricsCollector;
 use ballista_core::error::BallistaError;
 use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::serde::protobuf;
@@ -27,7 +28,7 @@ use ballista_core::serde::protobuf::ExecutorRegistration;
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::TaskContext;
 use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
+
 use datafusion::physical_plan::udaf::AggregateUDF;
 use datafusion::physical_plan::udf::ScalarUDF;
 use datafusion::physical_plan::{ExecutionPlan, Partitioning};
@@ -48,6 +49,9 @@ pub struct Executor {
 
     /// Runtime environment for Executor
     pub runtime: Arc<RuntimeEnv>,
+
+    /// Collector for runtime execution metrics
+    pub metrics_collector: Arc<dyn ExecutorMetricsCollector>,
 }
 
 impl Executor {
@@ -56,6 +60,7 @@ impl Executor {
         metadata: ExecutorRegistration,
         work_dir: &str,
         runtime: Arc<RuntimeEnv>,
+        metrics_collector: Arc<dyn ExecutorMetricsCollector>,
     ) -> Self {
         Self {
             metadata,
@@ -64,6 +69,7 @@ impl Executor {
             scalar_functions: HashMap::new(),
             aggregate_functions: HashMap::new(),
             runtime,
+            metrics_collector,
         }
     }
 }
@@ -101,13 +107,8 @@ impl Executor {
 
         let partitions = exec.execute_shuffle_write(part, task_ctx).await?;
 
-        println!(
-            "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
-            job_id,
-            stage_id,
-            part,
-            DisplayableExecutionPlan::with_metrics(&exec).indent()
-        );
+        self.metrics_collector
+            .record_stage(&job_id, stage_id, part, exec);
 
         Ok(partitions)
     }
diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs
index bb6dfa69b..4d145b269 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -22,6 +22,7 @@ pub mod execution_loop;
 pub mod executor;
 pub mod executor_server;
 pub mod flight_service;
+pub mod metrics;
 
 mod cpu_bound_executor;
 mod standalone;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index 66d9bd71a..825ddd4d8 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -42,6 +42,7 @@ use ballista_core::serde::BallistaCodec;
 use ballista_core::{print_version, BALLISTA_VERSION};
 use ballista_executor::executor::Executor;
 use ballista_executor::flight_service::BallistaFlightService;
+use ballista_executor::metrics::LoggingMetricsCollector;
 use config::prelude::*;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 
@@ -118,7 +119,14 @@ async fn main() -> Result<()> {
         BallistaError::Internal("Failed to init Executor RuntimeEnv".to_owned())
     })?);
 
-    let executor = Arc::new(Executor::new(executor_meta, &work_dir, runtime));
+    let metrics_collector = Arc::new(LoggingMetricsCollector::default());
+
+    let executor = Arc::new(Executor::new(
+        executor_meta,
+        &work_dir,
+        runtime,
+        metrics_collector,
+    ));
 
     let scheduler = SchedulerGrpcClient::connect(scheduler_url)
         .await
diff --git a/ballista/rust/executor/src/metrics/mod.rs b/ballista/rust/executor/src/metrics/mod.rs
new file mode 100644
index 000000000..2c7e1d504
--- /dev/null
+++ b/ballista/rust/executor/src/metrics/mod.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista_core::execution_plans::ShuffleWriterExec;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+
+/// `ExecutorMetricsCollector` records metrics for `ShuffleWriteExec`
+/// after they are executed.
+///
+/// After each stage completes, `ShuffleWriteExec::record_stage` will be
+/// called.
+pub trait ExecutorMetricsCollector: Send + Sync {
+    /// Record metrics for stage after it is executed
+    fn record_stage(
+        &self,
+        job_id: &str,
+        stage_id: usize,
+        partition: usize,
+        plan: ShuffleWriterExec,
+    );
+}
+
+/// Implementation of `ExecutorMetricsCollector` which logs the completed
+/// plan to stdout.
+#[derive(Default)]
+pub struct LoggingMetricsCollector {}
+
+impl ExecutorMetricsCollector for LoggingMetricsCollector {
+    fn record_stage(
+        &self,
+        job_id: &str,
+        stage_id: usize,
+        partition: usize,
+        plan: ShuffleWriterExec,
+    ) {
+        println!(
+            "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
+            job_id,
+            stage_id,
+            partition,
+            DisplayableExecutionPlan::with_metrics(&plan).indent()
+        );
+    }
+}
diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs
index edbb857cc..d68052af8 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -34,6 +34,7 @@ use tokio::net::TcpListener;
 use tonic::transport::{Channel, Server};
 use uuid::Uuid;
 
+use crate::metrics::LoggingMetricsCollector;
 use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
 
 pub async fn new_standalone_executor<
@@ -78,6 +79,7 @@ pub async fn new_standalone_executor<
         executor_meta,
         &work_dir,
         Arc::new(RuntimeEnv::new(config).unwrap()),
+        Arc::new(LoggingMetricsCollector::default()),
     ));
 
     let service = BallistaFlightService::new(executor.clone());