You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/26 13:12:02 UTC

[arrow-ballista] branch master updated: Refine log level for trial info and periodically invoked places (#447)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b60feda Refine log level for trial info and periodically invoked places (#447)
1b60feda is described below

commit 1b60fedad49aebad72325f2e0d5273f3ebf92508
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Oct 26 21:11:57 2022 +0800

    Refine log level for trial info and periodically invoked places (#447)
---
 ballista/executor/src/executor_server.rs        |  2 ++
 ballista/scheduler/src/scheduler_server/grpc.rs |  2 +-
 ballista/scheduler/src/state/task_manager.rs    | 17 ++++++++++++++---
 3 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs
index acf5c873..ad3871fa 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -350,6 +350,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
             partition_id: partition_id as usize,
         };
 
+        info!("Start to execute shuffle write for task {}", task_identity);
+
         let execution_result = self
             .executor
             .execute_shuffle_write(
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 7b5c96bd..c7c8b499 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -479,7 +479,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         request: Request<GetJobStatusParams>,
     ) -> Result<Response<GetJobStatusResult>, Status> {
         let job_id = request.into_inner().job_id;
-        debug!("Received get_job_status request for job {}", job_id);
+        trace!("Received get_job_status request for job {}", job_id);
         match self.state.task_manager.get_job_status(&job_id).await {
             Ok(status) => Ok(Response::new(GetJobStatusResult { status })),
             Err(e) => {
diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs
index fed7cce3..de55fd4c 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -47,6 +47,8 @@ use std::sync::Arc;
 use std::time::Duration;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tokio::sync::RwLock;
+use tracing::trace;
+
 type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;
 
 // TODO move to configuration file
@@ -217,7 +219,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
     ) -> Result<Vec<QueryStageSchedulerEvent>> {
         let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
         for status in task_status {
-            debug!("Task Update\n{:?}", status);
+            trace!("Task Update\n{:?}", status);
             let job_id = status.job_id.clone();
             let job_task_statuses = job_updates.entry(job_id).or_insert_with(Vec::new);
             job_task_statuses.push(status);
@@ -668,17 +670,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
     }
 
     #[allow(dead_code)]
-    pub fn prepare_multi_task_definition(
+    /// Prepare a MultiTaskDefinition with multiple tasks belonging to the same job stage
+    fn prepare_multi_task_definition(
         &self,
         tasks: Vec<TaskDescription>,
     ) -> Result<MultiTaskDefinition> {
-        debug!("Preparing multi task definition for {:?}", tasks);
         if let Some(task) = tasks.get(0) {
             let session_id = task.session_id.clone();
             let job_id = task.partition.job_id.clone();
             let stage_id = task.partition.stage_id;
             let stage_attempt_num = task.stage_attempt_num;
 
+            if log::max_level() >= log::Level::Debug {
+                let task_ids: Vec<usize> = tasks
+                    .iter()
+                    .map(|task| task.partition.partition_id)
+                    .collect();
+                debug!("Preparing multi task definition for tasks {:?} belonging to job stage {}/{}", task_ids, job_id, stage_id);
+                trace!("With task details {:?}", tasks);
+            }
+
             if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
                 let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id)
                 {