You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/26 13:12:02 UTC
[arrow-ballista] branch master updated: Refine log level for trial info and periodically invoked places (#447)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 1b60feda Refine log level for trial info and periodically invoked places (#447)
1b60feda is described below
commit 1b60fedad49aebad72325f2e0d5273f3ebf92508
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Oct 26 21:11:57 2022 +0800
Refine log level for trial info and periodically invoked places (#447)
---
ballista/executor/src/executor_server.rs | 2 ++
ballista/scheduler/src/scheduler_server/grpc.rs | 2 +-
ballista/scheduler/src/state/task_manager.rs | 17 ++++++++++++++---
3 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs
index acf5c873..ad3871fa 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -350,6 +350,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
partition_id: partition_id as usize,
};
+ info!("Start to execute shuffle write for task {}", task_identity);
+
let execution_result = self
.executor
.execute_shuffle_write(
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 7b5c96bd..c7c8b499 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -479,7 +479,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
request: Request<GetJobStatusParams>,
) -> Result<Response<GetJobStatusResult>, Status> {
let job_id = request.into_inner().job_id;
- debug!("Received get_job_status request for job {}", job_id);
+ trace!("Received get_job_status request for job {}", job_id);
match self.state.task_manager.get_job_status(&job_id).await {
Ok(status) => Ok(Response::new(GetJobStatusResult { status })),
Err(e) => {
diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs
index fed7cce3..de55fd4c 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -47,6 +47,8 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
+use tracing::trace;
+
type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;
// TODO move to configuration file
@@ -217,7 +219,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
) -> Result<Vec<QueryStageSchedulerEvent>> {
let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
for status in task_status {
- debug!("Task Update\n{:?}", status);
+ trace!("Task Update\n{:?}", status);
let job_id = status.job_id.clone();
let job_task_statuses = job_updates.entry(job_id).or_insert_with(Vec::new);
job_task_statuses.push(status);
@@ -668,17 +670,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
}
#[allow(dead_code)]
- pub fn prepare_multi_task_definition(
+ /// Prepare a MultiTaskDefinition with multiple tasks belonging to the same job stage
+ fn prepare_multi_task_definition(
&self,
tasks: Vec<TaskDescription>,
) -> Result<MultiTaskDefinition> {
- debug!("Preparing multi task definition for {:?}", tasks);
if let Some(task) = tasks.get(0) {
let session_id = task.session_id.clone();
let job_id = task.partition.job_id.clone();
let stage_id = task.partition.stage_id;
let stage_attempt_num = task.stage_attempt_num;
+ if log::max_level() >= log::Level::Debug {
+ let task_ids: Vec<usize> = tasks
+ .iter()
+ .map(|task| task.partition.partition_id)
+ .collect();
+ debug!("Preparing multi task definition for tasks {:?} belonging to job stage {}/{}", task_ids, job_id, stage_id);
+ trace!("With task details {:?}", tasks);
+ }
+
if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id)
{