You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/05/19 08:14:46 UTC
[linkis] branch dev-1.4.0 updated: [Feature]Add MDC log format to Linkis for tracking JobID #4553 (#4554)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 7544426a3 [Feature]Add MDC log format to Linkis for tracking JobID #4553 (#4554)
7544426a3 is described below
commit 7544426a39960b4ab7e28bfcda50585f273ad7f8
Author: peacewong <wp...@gmail.com>
AuthorDate: Fri May 19 16:14:38 2023 +0800
[Feature]Add MDC log format to Linkis for tracking JobID #4553 (#4554)
---
.../governance/common/utils/LoggerUtils.java | 43 +++++
.../common/conf/GovernanceCommonConf.scala | 3 +
.../impl/AbstractEngineConnLaunchService.scala | 20 ++-
.../executor/async/AsyncEngineConnJob.scala | 10 ++
.../executor/async/AsyncExecuteExecutor.scala | 16 +-
.../executor/execute/ComputationExecutor.scala | 7 +-
.../service/TaskExecutionServiceImpl.scala | 181 +++++++++++----------
.../executor/lock/EngineConnTimedLock.scala | 16 +-
.../apache/linkis/entrance/EntranceServer.scala | 6 +
.../entrance/execute/DefaultEntranceExecutor.scala | 5 +
.../linkis/entrance/execute/EntranceExecutor.scala | 14 +-
.../entrance/execute/EntranceExecutorManager.scala | 2 +-
.../manager/am/pointer/DefaultEMNodPointer.java | 5 +-
.../engine/DefaultEngineAskEngineService.java | 71 ++++----
linkis-dist/package/conf/log4j2.xml | 2 +-
.../elasticsearch/src/main/resources/log4j2.xml | 8 +-
.../flink/src/main/resources/log4j2.xml | 6 +-
.../hive/src/main/resources/log4j2.xml | 8 +-
.../io_file/src/main/resources/log4j2.xml | 6 +-
.../jdbc/src/main/resources/log4j2.xml | 6 +-
.../openlookeng/src/main/resources/log4j2.xml | 6 +-
.../pipeline/src/main/resources/log4j2.xml | 6 +-
.../presto/src/main/resources/log4j2.xml | 8 +-
.../python/src/main/resources/log4j2.xml | 6 +-
.../seatunnel/src/main/resources/log4j2.xml | 6 +-
.../shell/src/main/resources/conf/log4j2.xml | 6 +-
.../spark/src/main/resources/log4j2.xml | 10 +-
.../sqoop/src/main/resources/log4j2.xml | 8 +-
.../trino/src/main/resources/log4j2.xml | 6 +-
.../catalyst/planner/TaskPlannerTransform.scala | 28 +++-
.../service/ComputationTaskExecutionReceiver.scala | 13 +-
.../strategy/async/AsyncExecTaskRunnerImpl.scala | 50 +++---
.../utils/OrchestratorLoggerUtils.scala | 37 +++++
linkis-orchestrator/pom.xml | 1 +
34 files changed, 411 insertions(+), 215 deletions(-)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/utils/LoggerUtils.java b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/utils/LoggerUtils.java
new file mode 100644
index 000000000..99addd4fe
--- /dev/null
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/utils/LoggerUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.governance.common.utils;
+
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
+import org.apache.linkis.governance.common.constant.job.JobRequestConstants;
+
+import java.util.Map;
+
+import org.slf4j.MDC;
+
+public class LoggerUtils {
+
+ public static void setJobIdMDC(String jobId) {
+ MDC.put(JobRequestConstants.JOB_ID(), jobId);
+ }
+
+ public static void setJobIdMDC(Map<String, Object> props) {
+ if (GovernanceCommonConf.MDC_ENABLED()) {
+ String jobId = JobUtils.getJobIdFromMap(props);
+ MDC.put(JobRequestConstants.JOB_ID(), jobId);
+ }
+ }
+
+ public static void removeJobIdMDC() {
+ MDC.remove(JobRequestConstants.JOB_ID());
+ }
+}
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
index bed2d3c0c..67aab4bd4 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
@@ -77,6 +77,9 @@ object GovernanceCommonConf {
val FAKE_PROGRESS: Float = CommonVars[Float]("linkis.job.fake.progress", 0.99f).getValue
+ val MDC_ENABLED =
+ CommonVars("linkis.mdc.log.enabled", true, "MDC Switch").getValue
+
def getEngineEnvValue(envKey: String): String = {
CommonVars(envKey, "").getValue
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index 8ad3031b8..837d91256 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -29,7 +29,7 @@ import org.apache.linkis.ecm.server.listener.EngineConnLaunchStatusChangeEvent
import org.apache.linkis.ecm.server.service.{EngineConnLaunchService, ResourceLocalizationService}
import org.apache.linkis.ecm.server.util.ECMUtils
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
-import org.apache.linkis.governance.common.utils.JobUtils
+import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus.Failed
import org.apache.linkis.manager.common.entity.node.{AMEngineNode, EngineNode}
@@ -61,8 +61,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
}
override def launchEngineConn(request: EngineConnLaunchRequest, duration: Long): EngineNode = {
- // 1.创建engineConn和runner,launch 并设置基础属性
+ // 1.Create engineConn and runner, launch and set basic properties
val taskId = JobUtils.getJobIdFromStringMap(request.creationDesc.properties)
+ LoggerUtils.setJobIdMDC(taskId)
logger.info("TaskId: {} try to launch a new EngineConn with {}.", taskId: Any, request: Any)
val conn = createEngineConn
val runner = createEngineConnLaunchRunner
@@ -77,9 +78,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
conn.setStatus(NodeStatus.Starting)
conn.setEngineConnInfo(new EngineConnInfo)
conn.setEngineConnManagerEnv(launch.getEngineConnManagerEnv())
- // 2.资源本地化,并且设置ecm的env环境信息
+ // 2.Resource localization, and set the env environment information of ecm
getResourceLocalizationServie.handleInitEngineConnResources(request, conn)
- // 4.run
+ // 3.run
Utils.tryCatch {
beforeLaunch(request, conn, duration)
runner.run()
@@ -95,15 +96,19 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
afterLaunch(request, conn, duration)
val future = Future {
+ LoggerUtils.setJobIdMDC(taskId)
logger.info(
"TaskId: {} with request {} wait engineConn {} start",
Array(taskId, request, conn.getServiceInstance): _*
)
- waitEngineConnStart(request, conn, duration)
+ Utils.tryFinally(waitEngineConnStart(request, conn, duration)) {
+ LoggerUtils.removeJobIdMDC()
+ }
}
future onComplete {
case Failure(t) =>
+ LoggerUtils.setJobIdMDC(taskId)
logger.error(
"TaskId: {} init {} failed. {} with request {}",
Array(
@@ -118,7 +123,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
EngineConnLaunchStatusChangeEvent(conn.getTickedId, Failed)
)
+ LoggerUtils.removeJobIdMDC()
case Success(_) =>
+ LoggerUtils.setJobIdMDC(taskId)
logger.info(
"TaskId: {} init {} succeed. {} with request {}",
Array(
@@ -130,6 +137,7 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
request
): _*
)
+ LoggerUtils.removeJobIdMDC()
}
} { t =>
logger.error(
@@ -159,8 +167,10 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
EngineConnLaunchStatusChangeEvent(conn.getTickedId, Failed)
)
+ LoggerUtils.removeJobIdMDC()
throw t
}
+ LoggerUtils.removeJobIdMDC()
val engineNode = new AMEngineNode()
engineNode.setLabels(conn.getLabels)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncEngineConnJob.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncEngineConnJob.scala
index 8876a50c3..6f73f67fe 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncEngineConnJob.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncEngineConnJob.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.engineconn.computation.executor.async
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.scheduler.executer.{
CompletedExecuteResponse,
ErrorExecuteResponse,
@@ -27,6 +28,7 @@ import org.apache.linkis.scheduler.executer.{
SuccessExecuteResponse
}
import org.apache.linkis.scheduler.queue.{Job, JobInfo}
+import org.apache.linkis.scheduler.queue.SchedulerEventState.SchedulerEventState
class AsyncEngineConnJob(task: EngineConnTask, engineExecutionContext: EngineExecutionContext)
extends Job {
@@ -47,6 +49,14 @@ class AsyncEngineConnJob(task: EngineConnTask, engineExecutionContext: EngineExe
override def close(): Unit = {}
+ override def transition(state: SchedulerEventState): Unit = Utils.tryFinally {
+ val jobId = JobUtils.getJobIdFromMap(task.getProperties)
+ LoggerUtils.setJobIdMDC(jobId)
+ super.transition(state)
+ } {
+ LoggerUtils.removeJobIdMDC()
+ }
+
override def transitionCompleted(executeCompleted: CompletedExecuteResponse): Unit = {
var executeCompletedNew: CompletedExecuteResponse = executeCompleted
executeCompleted match {
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncExecuteExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncExecuteExecutor.scala
index a27d3f029..46332b93f 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncExecuteExecutor.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncExecuteExecutor.scala
@@ -17,8 +17,10 @@
package org.apache.linkis.engineconn.computation.executor.async
+import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineconn.common.exception.EngineConnException
import org.apache.linkis.engineconn.computation.executor.utlis.ComputationErrorCode
+import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.scheduler.executer._
import org.apache.linkis.scheduler.executer.ExecutorState.ExecutorState
@@ -31,10 +33,16 @@ class AsyncExecuteExecutor(executor: AsyncConcurrentComputationExecutor) extends
override def execute(executeRequest: ExecuteRequest): ExecuteResponse = {
executeRequest match {
case asyncExecuteRequest: AsyncExecuteRequest =>
- executor.asyncExecuteTask(
- asyncExecuteRequest.task,
- asyncExecuteRequest.engineExecutionContext
- )
+ Utils.tryFinally {
+ val jobId = JobUtils.getJobIdFromMap(asyncExecuteRequest.task.getProperties)
+ LoggerUtils.setJobIdMDC(jobId)
+ executor.asyncExecuteTask(
+ asyncExecuteRequest.task,
+ asyncExecuteRequest.engineExecutionContext
+ )
+ } {
+ LoggerUtils.removeJobIdMDC()
+ }
case _ =>
throw EngineConnException(
ComputationErrorCode.ASYNC_EXECUTOR_ERROR_CODE,
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
index 00622da92..98a6c2b21 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
@@ -35,6 +35,7 @@ import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.paser.CodeParser
import org.apache.linkis.governance.common.protocol.task.{EngineConcurrentInfo, RequestTask}
+import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -251,7 +252,9 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
}
}
- def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
+ def execute(engineConnTask: EngineConnTask): ExecuteResponse = Utils.tryFinally {
+ val jobId = JobUtils.getJobIdFromMap(engineConnTask.getProperties)
+ LoggerUtils.setJobIdMDC(jobId)
logger.info(s"start to execute task ${engineConnTask.getTaskId}")
updateLastActivityTime()
beforeExecute(engineConnTask)
@@ -264,6 +267,8 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
Utils.tryAndWarn(afterExecute(engineConnTask, response))
logger.info(s"Finished to execute task ${engineConnTask.getTaskId}")
response
+ } {
+ LoggerUtils.removeJobIdMDC()
}
def setCodeParser(codeParser: CodeParser): Unit = this.codeParser = Some(codeParser)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 80903ef57..93d607c25 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -57,7 +57,7 @@ import org.apache.linkis.governance.common.exception.engineconn.{
EngineConnExecutorErrorException
}
import org.apache.linkis.governance.common.protocol.task._
-import org.apache.linkis.governance.common.utils.JobUtils
+import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.protocol.resource.{
ResponseTaskRunningInfo,
@@ -168,82 +168,92 @@ class TaskExecutionServiceImpl
}
@Receiver
- override def execute(requestTask: RequestTask, sender: Sender): ExecuteResponse = {
-
- // check lock
- logger.info("Received a new task, task content is " + requestTask)
- if (StringUtils.isBlank(requestTask.getLock)) {
- logger.error(s"Invalid lock : ${requestTask.getLock} , requestTask : " + requestTask)
- return ErrorExecuteResponse(
- s"Invalid lock : ${requestTask.getLock}.",
- new EngineConnExecutorErrorException(
- EngineConnExecutorErrorCode.INVALID_PARAMS,
- "Invalid lock or code(请获取到锁后再提交任务.)"
+ override def execute(requestTask: RequestTask, sender: Sender): ExecuteResponse =
+ Utils.tryFinally {
+ val jobId = JobUtils.getJobIdFromMap(requestTask.getProperties)
+ LoggerUtils.setJobIdMDC(jobId)
+ // check lock
+ logger.info("Received a new task, task content is " + requestTask)
+ if (StringUtils.isBlank(requestTask.getLock)) {
+ logger.error(s"Invalid lock : ${requestTask.getLock} , requestTask : " + requestTask)
+ return ErrorExecuteResponse(
+ s"Invalid lock : ${requestTask.getLock}.",
+ new EngineConnExecutorErrorException(
+ EngineConnExecutorErrorCode.INVALID_PARAMS,
+ "Invalid lock or code(请获取到锁后再提交任务.)"
+ )
)
- )
- }
- if (!lockService.isLockExist(requestTask.getLock)) {
- logger.error(s"Lock ${requestTask.getLock} not exist, cannot execute.")
- return ErrorExecuteResponse(
- "Lock not exixt",
- new EngineConnExecutorErrorException(
- EngineConnExecutorErrorCode.INVALID_LOCK,
- "Lock : " + requestTask.getLock + " not exist(您的锁无效,请重新获取后再提交)."
+ }
+ if (!lockService.isLockExist(requestTask.getLock)) {
+ logger.error(s"Lock ${requestTask.getLock} not exist, cannot execute.")
+ return ErrorExecuteResponse(
+ "Lock not exixt",
+ new EngineConnExecutorErrorException(
+ EngineConnExecutorErrorCode.INVALID_LOCK,
+ "Lock : " + requestTask.getLock + " not exist(您的锁无效,请重新获取后再提交)."
+ )
)
- )
- }
+ }
- if (StringUtils.isBlank(requestTask.getCode)) {
- return IncompleteExecuteResponse(
- "Your code is incomplete, it may be that only comments are selected for execution(您的代码不完整,可能是仅仅选中了注释进行执行)"
- )
- }
+ if (StringUtils.isBlank(requestTask.getCode)) {
+ return IncompleteExecuteResponse(
+ "Your code is incomplete, it may be that only comments are selected for execution(您的代码不完整,可能是仅仅选中了注释进行执行)"
+ )
+ }
- val taskId: Int = taskExecutedNum.incrementAndGet()
- val retryAble: Boolean = {
- val retry =
- requestTask.getProperties.getOrDefault(ComputationEngineConstant.RETRYABLE_TYPE_NAME, null)
- if (null != retry) retry.asInstanceOf[Boolean]
- else false
- }
- val jobId = JobUtils.getJobIdFromMap(requestTask.getProperties)
- if (StringUtils.isNotBlank(jobId)) {
- System.getProperties.put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
- logger.info(s"Received job with id ${jobId}.")
- }
- val task = new CommonEngineConnTask(String.valueOf(taskId), retryAble)
- task.setCode(requestTask.getCode)
- task.setProperties(requestTask.getProperties)
- task.data(ComputationEngineConstant.LOCK_TYPE_NAME, requestTask.getLock)
- task.setStatus(ExecutionNodeStatus.Scheduled)
- val labels = requestTask.getLabels.asScala.toArray
- task.setLabels(labels)
- val entranceServerInstance = RPCUtils.getServiceInstanceFromSender(sender)
- task.setCallbackServiceInstance(entranceServerInstance)
- logger.info(s"task $taskId submit executor to execute")
- val runnable = new Runnable {
- override def run(): Unit = Utils.tryCatch {
- // Waiting to run, preventing task messages from being sent to submit services before SubmitResponse, such as entry
- Thread.sleep(ComputationExecutorConf.TASK_SUBMIT_WAIT_TIME_MS)
- submitTaskToExecutor(task, labels) match {
- case ErrorExecuteResponse(message, throwable) =>
- sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
- logger.error(message, throwable)
- sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
- case _ =>
+ val taskId: Int = taskExecutedNum.incrementAndGet()
+ val retryAble: Boolean = {
+ val retry =
+ requestTask.getProperties.getOrDefault(
+ ComputationEngineConstant.RETRYABLE_TYPE_NAME,
+ null
+ )
+ if (null != retry) retry.asInstanceOf[Boolean]
+ else false
+ }
+
+ if (StringUtils.isNotBlank(jobId)) {
+ System.getProperties.put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
+ logger.info(s"Received job with id ${jobId}.")
+ }
+ val task = new CommonEngineConnTask(String.valueOf(taskId), retryAble)
+ task.setCode(requestTask.getCode)
+ task.setProperties(requestTask.getProperties)
+ task.data(ComputationEngineConstant.LOCK_TYPE_NAME, requestTask.getLock)
+ task.setStatus(ExecutionNodeStatus.Scheduled)
+ val labels = requestTask.getLabels.asScala.toArray
+ task.setLabels(labels)
+ val entranceServerInstance = RPCUtils.getServiceInstanceFromSender(sender)
+ task.setCallbackServiceInstance(entranceServerInstance)
+ logger.info(s"task $taskId submit executor to execute")
+ val runnable = new Runnable {
+ override def run(): Unit = Utils.tryCatch {
+ // Waiting to run, preventing task messages from being sent to submit services before SubmitResponse, such as entry
+ Thread.sleep(ComputationExecutorConf.TASK_SUBMIT_WAIT_TIME_MS)
+ LoggerUtils.setJobIdMDC(jobId)
+ submitTaskToExecutor(task, labels) match {
+ case ErrorExecuteResponse(message, throwable) =>
+ sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
+ logger.error(message, throwable)
+ sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
+ case _ =>
+ }
+ LoggerUtils.removeJobIdMDC()
+ } { t =>
+ logger.warn("Failed to submit task ", t)
+ LoggerUtils.removeJobIdMDC()
+ sendToEntrance(
+ task,
+ ResponseTaskError(task.getTaskId, ExceptionUtils.getRootCauseMessage(t))
+ )
+ sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
}
- } { t =>
- logger.warn("Failed to submit task ", t)
- sendToEntrance(
- task,
- ResponseTaskError(task.getTaskId, ExceptionUtils.getRootCauseMessage(t))
- )
- sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
}
+ val submitTaskToExecutorFuture = taskAsyncSubmitExecutor.submit(runnable)
+ SubmitResponse(task.getTaskId)
+ } {
+ LoggerUtils.removeJobIdMDC()
}
- val submitTaskToExecutorFuture = taskAsyncSubmitExecutor.submit(runnable)
- SubmitResponse(task.getTaskId)
- }
private def submitTaskToExecutor(
task: CommonEngineConnTask,
@@ -254,11 +264,11 @@ class TaskExecutionServiceImpl
case computationExecutor: ComputationExecutor =>
taskIdCache.put(task.getTaskId, computationExecutor)
submitTask(task, computationExecutor)
- case o =>
+ case _ =>
val labelsStr =
if (labels != null) labels.filter(_ != null).map(_.getStringValue).mkString(",") else ""
val msg =
- "Invalid computationExecutor : " + o.getClass.getName + ", labels : " + labelsStr + ", requestTask : " + task.getTaskId
+ "Invalid computationExecutor : " + executor.getClass.getName + ", labels : " + labelsStr + ", requestTask : " + task.getTaskId
logger.error(msg)
ErrorExecuteResponse(
"Invalid computationExecutor(生成无效的计算引擎,请联系管理员).",
@@ -386,18 +396,23 @@ class TaskExecutionServiceImpl
new Thread(consumerRunnable)
}
- private def executeTask(task: EngineConnTask, executor: ComputationExecutor): Unit = {
- val response = executor.execute(task)
- response match {
- case ErrorExecuteResponse(message, throwable) =>
- sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
- logger.error(message, throwable)
- LogHelper.pushAllRemainLogs()
- executor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
- case _ => logger.warn(s"task get response is $response")
+ private def executeTask(task: EngineConnTask, executor: ComputationExecutor): Unit =
+ Utils.tryFinally {
+ val jobId = JobUtils.getJobIdFromMap(task.getProperties)
+ LoggerUtils.setJobIdMDC(jobId)
+ val response = executor.execute(task)
+ response match {
+ case ErrorExecuteResponse(message, throwable) =>
+ sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
+ logger.error(message, throwable)
+ LogHelper.pushAllRemainLogs()
+ executor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
+ case _ => logger.warn(s"task get response is $response")
+ }
+ clearCache(task.getTaskId)
+ } {
+ LoggerUtils.removeJobIdMDC()
}
- clearCache(task.getTaskId)
- }
/**
* Open daemon thread
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
index 26a920379..84ab6fb7c 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
@@ -28,6 +28,7 @@ import org.apache.linkis.engineconn.acessible.executor.listener.event.{
ExecutorUnLockEvent
}
import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconn.executor.entity.SensibleExecutor
import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
@@ -169,9 +170,18 @@ class EngineConnTimedLock(private var timeout: Long)
}
private def unlockCallback(lockStr: String): Unit = {
- /* if (null != lockedBy) {
- lockedBy.transition(NodeStatus.Unlock)
- } */
+ val nodeStatus = ExecutorManager.getInstance.getReportExecutor match {
+ case sensibleExecutor: SensibleExecutor =>
+ sensibleExecutor.getStatus
+ case _ => NodeStatus.Idle
+ }
+ if (NodeStatus.isCompleted(nodeStatus)) {
+ logger.info(
+ "The node({}) is already in the completed state, and the unlocking is invalid",
+ nodeStatus.toString
+ )
+ return
+ }
val executors = ExecutorManager.getInstance.getExecutors.filter(executor =>
null != executor && !executor.isClosed
)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index f298e5425..1035de1e2 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -28,6 +28,7 @@ import org.apache.linkis.entrance.log.LogReader
import org.apache.linkis.entrance.timeout.JobTimeoutManager
import org.apache.linkis.entrance.utils.JobHistoryHelper
import org.apache.linkis.governance.common.entity.job.JobRequest
+import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState}
@@ -78,12 +79,15 @@ abstract class EntranceServer extends Logging {
}
logger.info(s"received a request,convert $jobRequest")
+ LoggerUtils.setJobIdMDC(jobRequest.getId.toString)
+
val logAppender = new java.lang.StringBuilder()
Utils.tryThrow(
getEntranceContext
.getOrCreateEntranceInterceptors()
.foreach(int => jobRequest = int.apply(jobRequest, logAppender))
) { t =>
+ LoggerUtils.removeJobIdMDC()
val error = t match {
case error: ErrorException => error
case t1: Throwable =>
@@ -165,8 +169,10 @@ abstract class EntranceServer extends Logging {
entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg))
case _ =>
}
+ LoggerUtils.removeJobIdMDC()
job
} { t =>
+ LoggerUtils.removeJobIdMDC()
job.onFailure("Submitting the query failed!(提交查询失败!)", t)
val _jobRequest: JobRequest =
getEntranceContext.getOrCreateEntranceParser().parseToJobRequest(job)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index c509f1005..266de6eb5 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -25,6 +25,7 @@ import org.apache.linkis.entrance.orchestrator.EntranceOrchestrationFactory
import org.apache.linkis.entrance.utils.JobHistoryHelper
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.protocol.task.ResponseTaskStatus
+import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
import org.apache.linkis.manager.label.utils.LabelUtil
@@ -106,6 +107,7 @@ class DefaultEntranceExecutor(id: Long)
entranceExecuteRequest: EntranceExecuteRequest,
orchestration: Orchestration
): Unit = {
+ LoggerUtils.setJobIdMDC(getId.toString)
orchestrationResponse match {
case succeedResponse: SucceedTaskResponse =>
succeedResponse match {
@@ -184,6 +186,7 @@ class DefaultEntranceExecutor(id: Long)
_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateERROR(msg))
)
}
+ LoggerUtils.removeJobIdMDC()
}
def requestToComputationJobReq(entranceExecuteRequest: EntranceExecuteRequest): JobReq = {
@@ -238,11 +241,13 @@ class DefaultEntranceExecutor(id: Long)
}
override def kill(): Boolean = {
+ LoggerUtils.setJobIdMDC(getId.toString)
logger.info("Entrance start to kill job {} invoke Orchestrator ", this.getId)
Utils.tryAndWarn {
val msg = s"You job with id was cancelled by user!"
getRunningOrchestrationFuture.foreach(_.cancel(msg))
}
+ LoggerUtils.removeJobIdMDC()
true
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index 44cb3620c..be7fb1387 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -19,9 +19,11 @@ package org.apache.linkis.entrance.execute
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
import org.apache.linkis.entrance.job.EntranceExecuteRequest
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus._
import org.apache.linkis.governance.common.protocol.task.{RequestTask, ResponseTaskStatus}
+import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.orchestrator.computation.operation.log.LogProcessor
import org.apache.linkis.orchestrator.computation.operation.progress.ProgressProcessor
import org.apache.linkis.orchestrator.computation.operation.resource.ResourceReportProcessor
@@ -29,6 +31,7 @@ import org.apache.linkis.orchestrator.core.OrchestrationFuture
import org.apache.linkis.protocol.UserWithCreator
import org.apache.linkis.scheduler.executer._
import org.apache.linkis.scheduler.executer.ExecutorState.ExecutorState
+import org.apache.linkis.server.BDPJettyServerHelper
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
@@ -69,9 +72,14 @@ abstract class EntranceExecutor(val id: Long) extends Executor with Logging {
}
override def execute(executeRequest: ExecuteRequest): ExecuteResponse = {
- var request: RequestTask = null
- interceptors.foreach(in => request = in.apply(request, executeRequest))
- callExecute(executeRequest)
+ LoggerUtils.setJobIdMDC(getId.toString)
+ Utils.tryFinally {
+ var request: RequestTask = null
+ interceptors.foreach(in => request = in.apply(request, executeRequest))
+ callExecute(executeRequest)
+ } {
+ LoggerUtils.removeJobIdMDC()
+ }
}
protected def callback(): Unit = {}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
index 0d5d60598..05bc5311b 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
@@ -90,7 +90,7 @@ abstract class EntranceExecutorManager(groupFactory: GroupFactory)
job.getJobRequest match {
case jobReq: JobRequest =>
val entranceEntranceExecutor =
- new DefaultEntranceExecutor(idGenerator.incrementAndGet())
+ new DefaultEntranceExecutor(jobReq.getId)
// getEngineConn Executor
job.getLogListener.foreach(
_.onLogUpdate(
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.java
index 73d3c4770..1458680c9 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.java
@@ -17,6 +17,7 @@
package org.apache.linkis.manager.am.pointer;
+import org.apache.linkis.common.exception.LinkisRetryException;
import org.apache.linkis.manager.am.exception.AMErrorException;
import org.apache.linkis.manager.common.constant.AMConstant;
import org.apache.linkis.manager.common.entity.node.EngineNode;
@@ -64,9 +65,9 @@ public class DefaultEMNodPointer extends AbstractNodePointer implements EMNodPoi
+ engineNode);
return engineNode;
} else {
- throw new AMErrorException(
+ throw new LinkisRetryException(
AMConstant.ENGINE_ERROR_CODE,
- "Failed to createEngine ask em " + getNode().getServiceInstance());
+ "Failed to createEngine ask em " + getNode().getServiceInstance() + "result: " + result);
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
index a2c4815f1..cc17e68a9 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
@@ -19,6 +19,7 @@ package org.apache.linkis.manager.am.service.engine;
import org.apache.linkis.common.exception.LinkisRetryException;
import org.apache.linkis.governance.common.utils.JobUtils;
+import org.apache.linkis.governance.common.utils.LoggerUtils;
import org.apache.linkis.manager.am.conf.AMConfiguration;
import org.apache.linkis.manager.am.util.LinkisUtils;
import org.apache.linkis.manager.common.constant.AMConstant;
@@ -68,6 +69,7 @@ public class DefaultEngineAskEngineService extends AbstractEngineService
@Receiver
public Object askEngine(EngineAskRequest engineAskRequest, Sender sender) {
String taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties());
+ LoggerUtils.setJobIdMDC(taskId);
logger.info(
String.format(
"received task: %s, engineAskRequest %s", taskId, engineAskRequest.toString()));
@@ -107,42 +109,49 @@ public class DefaultEngineAskEngineService extends AbstractEngineService
CompletableFuture<EngineNode> createNodeThread =
CompletableFuture.supplyAsync(
() -> {
- logger.info(
- String.format(
- "Task: %s start to async(%s) createEngine, %s",
- taskId, engineAskAsyncId, engineAskRequest.getCreateService()));
- engineAskRequest.getLabels().remove("engineInstance");
- EngineCreateRequest engineCreateRequest = new EngineCreateRequest();
- engineCreateRequest.setLabels(engineAskRequest.getLabels());
- engineCreateRequest.setTimeout(engineAskRequest.getTimeOut());
- engineCreateRequest.setUser(engineAskRequest.getUser());
- engineCreateRequest.setProperties(engineAskRequest.getProperties());
- engineCreateRequest.setCreateService(engineAskRequest.getCreateService());
- EngineNode createNode = engineCreateService.createEngine(engineCreateRequest, sender);
-
- long timeout =
- engineCreateRequest.getTimeout() <= 0
- ? AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong()
- : engineCreateRequest.getTimeout();
- EngineNode createEngineNode = getEngineNodeManager().useEngine(createNode, timeout);
- if (createEngineNode == null) {
- String message =
+ try {
+ LoggerUtils.setJobIdMDC(taskId);
+ logger.info(
+ String.format(
+ "Task: %s start to async(%s) createEngine, %s",
+ taskId, engineAskAsyncId, engineAskRequest.getCreateService()));
+ engineAskRequest.getLabels().remove("engineInstance");
+ EngineCreateRequest engineCreateRequest = new EngineCreateRequest();
+ engineCreateRequest.setLabels(engineAskRequest.getLabels());
+ engineCreateRequest.setTimeout(engineAskRequest.getTimeOut());
+ engineCreateRequest.setUser(engineAskRequest.getUser());
+ engineCreateRequest.setProperties(engineAskRequest.getProperties());
+ engineCreateRequest.setCreateService(engineAskRequest.getCreateService());
+ EngineNode createNode =
+ engineCreateService.createEngine(engineCreateRequest, sender);
+
+ long timeout =
+ engineCreateRequest.getTimeout() <= 0
+ ? AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong()
+ : engineCreateRequest.getTimeout();
+ EngineNode createEngineNode = getEngineNodeManager().useEngine(createNode, timeout);
+ if (createEngineNode == null) {
+ String message =
+ String.format(
+ "create engine%s success, but to use engine failed",
+ createNode.getServiceInstance());
+ throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, message);
+ }
+
+ logger.info(
String.format(
- "create engine%s success, but to use engine failed",
- createNode.getServiceInstance());
- throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, message);
+ "Task: %s finished to ask engine for user %s by create node %s",
+ taskId, engineAskRequest.getUser(), createEngineNode));
+ return createEngineNode;
+ } finally {
+ LoggerUtils.removeJobIdMDC();
}
-
- logger.info(
- String.format(
- "Task: %s finished to ask engine for user %s by create node %s",
- taskId, engineAskRequest.getUser(), createEngineNode));
- return createEngineNode;
},
EXECUTOR);
createNodeThread.whenComplete(
(EngineNode engineNode, Throwable exception) -> {
+ LoggerUtils.setJobIdMDC(taskId);
if (exception != null) {
boolean retryFlag;
if (exception instanceof LinkisRetryException) {
@@ -179,9 +188,11 @@ public class DefaultEngineAskEngineService extends AbstractEngineService
String.format(
"Task: %s Success to async(%s) createEngine %s",
taskId, engineAskAsyncId, engineNode));
- sender.send(new EngineCreateSuccess(engineAskAsyncId, (EngineNode) engineNode));
+ sender.send(new EngineCreateSuccess(engineAskAsyncId, engineNode));
}
+ LoggerUtils.removeJobIdMDC();
});
+ LoggerUtils.removeJobIdMDC();
return new EngineAskAsyncResponse(engineAskAsyncId, Sender.getThisServiceInstance());
}
diff --git a/linkis-dist/package/conf/log4j2.xml b/linkis-dist/package/conf/log4j2.xml
index f91717325..c8721b646 100644
--- a/linkis-dist/package/conf/log4j2.xml
+++ b/linkis-dist/package/conf/log4j2.xml
@@ -24,7 +24,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${LOG_PATH}/${LOG_FILE}.log"
filePattern="${LOG_PATH}/$${date:yyyy-MM}/${LOG_FILE}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/elasticsearch/src/main/resources/log4j2.xml
index 020b94567..0aecbbe74 100644
--- a/linkis-engineconn-plugins/elasticsearch/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/elasticsearch/src/main/resources/log4j2.xml
@@ -20,23 +20,23 @@
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Send name="SendPackage" >
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}]- %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
diff --git a/linkis-engineconn-plugins/flink/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/flink/src/main/resources/log4j2.xml
index 4e2c961ae..c3741cef3 100644
--- a/linkis-engineconn-plugins/flink/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/flink/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -31,12 +31,12 @@
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
diff --git a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
index 50a36c0ed..8b5e42d15 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
filePattern="${env:LOG_DIRS:-logs}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -29,18 +29,18 @@
<File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS:-logs}/yarnApp">
<RegexFilter regex=".*application_.*" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
</File>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Send name="SendPackage" >
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
diff --git a/linkis-engineconn-plugins/io_file/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/io_file/src/main/resources/log4j2.xml
index 75a5c2f5d..bd868ef5c 100644
--- a/linkis-engineconn-plugins/io_file/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/io_file/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -31,12 +31,12 @@
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/jdbc/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/jdbc/src/main/resources/log4j2.xml
index 83186732f..2e36367c4 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/jdbc/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -31,12 +31,12 @@
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/openlookeng/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/openlookeng/src/main/resources/log4j2.xml
index e9e7bb70f..35ee72662 100644
--- a/linkis-engineconn-plugins/openlookeng/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/openlookeng/src/main/resources/log4j2.xml
@@ -20,17 +20,17 @@
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<File name="stderr" fileName="${env:PWD}/logs/stderr" append="true">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</File>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/pipeline/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/pipeline/src/main/resources/log4j2.xml
index 03517a50e..86102506c 100644
--- a/linkis-engineconn-plugins/pipeline/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/pipeline/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -31,12 +31,12 @@
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/presto/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/presto/src/main/resources/log4j2.xml
index 020b94567..2cd3e264c 100644
--- a/linkis-engineconn-plugins/presto/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/presto/src/main/resources/log4j2.xml
@@ -20,23 +20,23 @@
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Send name="SendPackage" >
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
diff --git a/linkis-engineconn-plugins/python/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/python/src/main/resources/log4j2.xml
index a385c888f..1b04ca299 100644
--- a/linkis-engineconn-plugins/python/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/python/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -31,12 +31,12 @@
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/seatunnel/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/seatunnel/src/main/resources/log4j2.xml
index 83186732f..2e36367c4 100644
--- a/linkis-engineconn-plugins/seatunnel/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/seatunnel/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -31,12 +31,12 @@
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml b/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
index ee7557457..2d288889a 100644
--- a/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
+++ b/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -29,14 +29,14 @@
<File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/yarnApp">
<RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
</File>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
diff --git a/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
index 1158cefb5..298cf8cf7 100644
--- a/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
@@ -20,7 +20,7 @@
<appenders>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
@@ -29,23 +29,23 @@
<File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS:-logs}/yarnApp">
<RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
</File>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Send name="SendPackage" >
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<Console name="stderr" target="SYSTEM_ERR">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
index d93effc17..ab2285203 100644
--- a/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
@@ -20,23 +20,23 @@
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<File name="stderr" fileName="${env:PWD}/logs/stderr" append="true">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</File>
<File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/yarnApp">
<RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
</File>
</appenders>
<loggers>
diff --git a/linkis-engineconn-plugins/trino/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/trino/src/main/resources/log4j2.xml
index 9cf70f16c..8c3997f3f 100644
--- a/linkis-engineconn-plugins/trino/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/trino/src/main/resources/log4j2.xml
@@ -20,16 +20,16 @@
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Send>
<File name="stderr" fileName="${env:PWD}/logs/stderr" append="true">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</File>
</appenders>
<loggers>
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala
index f5fc6d550..ab404dba6 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala
@@ -34,6 +34,7 @@ import scala.collection.mutable.ArrayBuffer
*/
class TaskPlannerTransform extends PlannerTransform with Logging {
+ @Deprecated
def rebuildTreeNode(tmpTask: Task): Task = {
tmpTask.getChildren.foreach(child => {
val newParents = child.getParents.clone() :+ tmpTask
@@ -42,6 +43,7 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
tmpTask
}
+ @Deprecated
def buildCodeLogicTaskTree(
codeLogicalUnit: CodeLogicalUnit = null,
stage: Stage,
@@ -54,6 +56,7 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
(rebuildTreeNode(codeLogicalUnitTaskTmp), newStartJobTask)
}
+ @Deprecated
def buildStageTaskTree(taskDesc: StageTaskDesc, startJobTask: Task = null): (Task, Task) = {
taskDesc match {
case endStageTask: EndStageTaskDesc =>
@@ -102,6 +105,7 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
}
}
+ @Deprecated
def buildAllStageTaskTree(
stages: Array[Stage],
startJobTask: Task = null
@@ -117,6 +121,7 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
(stageTasks.toArray, reusedStartJobTask)
}
+ @Deprecated
def buildJobTaskTree(taskDesc: TaskDesc): Task = {
taskDesc match {
case startTask: StartJobTaskDesc =>
@@ -136,18 +141,29 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
override def apply(in: Job, context: ASTContext): Task = {
in match {
case job: CodeJob =>
- // TODO rebuild needed: Notice( Stages maybe have dependency relation.)
- // TODO This class should be split into two kind of transforms.
- // TODO First, two PlannerTransforms are needed: one to transform Job to JobTaskEnd, one to transform Job to StageTaskEnd.
- // TODO Second, AnalyzeTransforms are needed: one for adding a computationTask by stage for no computation strategy,
- // one to transform Job to JobTaskStart, one to transform Job to StageTaskStart.
- buildJobTaskTree(EndJobTaskDesc(job))
+ val taskDesc = EndJobTaskDesc(job)
+ val jobTaskTmp =
+ new JobTask(Array(), Array(buildCodeLogicTaskTree(job.getCodeLogicalUnit, job)))
+ jobTaskTmp.setTaskDesc(taskDesc)
+ rebuildNewTreeNode(jobTaskTmp)
case _ =>
logger.error(s"unknown job type:${in.getClass} ")
null
}
}
+ def rebuildNewTreeNode(tmpTask: Task): Task = {
+ tmpTask.getChildren.foreach(_.withNewParents(Array(tmpTask)))
+ tmpTask
+ }
+
+ def buildCodeLogicTaskTree(codeLogicalUnit: CodeLogicalUnit, job: Job): Task = {
+ val codeLogicalUnitTaskTmp = new CodeLogicalUnitTask(Array(), Array())
+ codeLogicalUnitTaskTmp.setTaskDesc(CodeLogicalUnitTaskDesc(job))
+ if (codeLogicalUnit != null) codeLogicalUnitTaskTmp.setCodeLogicalUnit(codeLogicalUnit)
+ codeLogicalUnitTaskTmp
+ }
+
override def getName: String = {
val className = getClass.getName
if (className endsWith "$") className.dropRight(1) else className
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
index 1e67787f3..9be30216d 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
@@ -17,8 +17,7 @@
package org.apache.linkis.orchestrator.computation.service
-import org.apache.linkis.common.log.LogUtils
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.Logging
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.protocol.task._
import org.apache.linkis.manager.common.protocol.resource.ResponseTaskRunningInfo
@@ -28,6 +27,7 @@ import org.apache.linkis.orchestrator.computation.monitor.EngineConnMonitor
import org.apache.linkis.orchestrator.core.ResultSet
import org.apache.linkis.orchestrator.ecm.service.TaskExecutionReceiver
import org.apache.linkis.orchestrator.listener.task._
+import org.apache.linkis.orchestrator.utils.OrchestratorLoggerUtils
import org.apache.linkis.rpc.Sender
import org.apache.linkis.rpc.message.annotation.Receiver
import org.apache.linkis.rpc.utils.RPCUtils
@@ -91,6 +91,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
codeExecTaskExecutorManager
.getByEngineConnAndTaskId(serviceInstance, taskStatus.execId)
.foreach { codeExecutor =>
+ OrchestratorLoggerUtils.setJobIdMDC(codeExecutor.getExecTask)
val event = TaskStatusEvent(codeExecutor.getExecTask, taskStatus.status)
logger.info(
s"From engineConn receive status info:$taskStatus, now post to listenerBus event: $event"
@@ -111,6 +112,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
)
}
}
+ OrchestratorLoggerUtils.removeJobIdMDC()
}
@Receiver
@@ -123,6 +125,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
codeExecTaskExecutorManager
.getByEngineConnAndTaskId(serviceInstance, taskResultSize.execId)
.foreach { codeExecutor =>
+ OrchestratorLoggerUtils.setJobIdMDC(codeExecutor.getExecTask)
val event = TaskResultSetSizeEvent(codeExecutor.getExecTask, taskResultSize.resultSize)
logger.info(
s"From engineConn receive resultSet size info$taskResultSize, now post to listenerBus event: $event"
@@ -134,6 +137,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
if (!isExist) {
logger.warn(s"from $serviceInstance received $taskResultSize cannot find execTask to deal")
}
+ OrchestratorLoggerUtils.removeJobIdMDC()
}
@Receiver
@@ -143,6 +147,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
codeExecTaskExecutorManager
.getByEngineConnAndTaskId(serviceInstance, taskResultSet.execId)
.foreach { codeExecutor =>
+ OrchestratorLoggerUtils.setJobIdMDC(codeExecutor.getExecTask)
val event = TaskResultSetEvent(
codeExecutor.getExecTask,
ResultSet(taskResultSet.output, taskResultSet.alias)
@@ -157,6 +162,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
if (!isExist) {
logger.warn(s"from $serviceInstance received $taskResultSet cannot find execTask to deal")
}
+ OrchestratorLoggerUtils.removeJobIdMDC()
}
@Receiver
@@ -166,6 +172,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
codeExecTaskExecutorManager
.getByEngineConnAndTaskId(serviceInstance, responseTaskError.execId)
.foreach { codeExecutor =>
+ OrchestratorLoggerUtils.setJobIdMDC(codeExecutor.getExecTask)
val event = TaskErrorResponseEvent(codeExecutor.getExecTask, responseTaskError.errorMsg)
logger.info(
s"From engineConn receive responseTaskError info${responseTaskError.execId}, now post to listenerBus event: ${event.execTask.getIDInfo()}"
@@ -177,7 +184,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
if (!isExist) {
logger.warn(s"from $serviceInstance received $responseTaskError cannot find execTask to deal")
}
-
+ OrchestratorLoggerUtils.removeJobIdMDC()
}
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncExecTaskRunnerImpl.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncExecTaskRunnerImpl.scala
index d546889b6..459bf7f38 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncExecTaskRunnerImpl.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncExecTaskRunnerImpl.scala
@@ -17,20 +17,16 @@
package org.apache.linkis.orchestrator.strategy.async
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
import org.apache.linkis.orchestrator.core.ResultSet
import org.apache.linkis.orchestrator.exception.OrchestratorErrorCodeSummary
-import org.apache.linkis.orchestrator.execution.{ArrayResultSetTaskResponse, _}
+import org.apache.linkis.orchestrator.execution._
import org.apache.linkis.orchestrator.execution.impl.{
DefaultFailedTaskResponse,
DefaultResultSetTaskResponse
}
-import org.apache.linkis.orchestrator.listener.{
- OrchestratorListenerBusContext,
- OrchestratorSyncListenerBus
-}
import org.apache.linkis.orchestrator.listener.execution.ExecTaskRunnerCompletedEvent
import org.apache.linkis.orchestrator.plans.physical.ExecTask
import org.apache.linkis.orchestrator.strategy.{
@@ -38,6 +34,7 @@ import org.apache.linkis.orchestrator.strategy.{
ResultSetExecTask,
StatusInfoExecTask
}
+import org.apache.linkis.orchestrator.utils.OrchestratorLoggerUtils
import scala.collection.mutable.ArrayBuffer
@@ -66,25 +63,26 @@ class AsyncExecTaskRunnerImpl(override val task: ExecTask)
override def isSucceed: Boolean = ExecutionNodeStatus.isScheduled(status)
- override def run(): Unit = try {
- logger.info(s"ExecTaskRunner Submit execTask(${task.getIDInfo}) to running")
- val response = task.execute()
- this.taskResponse = response
- response match {
- case async: AsyncTaskResponse =>
- transientStatus(ExecutionNodeStatus.Running)
- case succeed: SucceedTaskResponse =>
- logger.info(s"Succeed to execute ExecTask(${task.getIDInfo})")
- transientStatus(ExecutionNodeStatus.Succeed)
- case failedTaskResponse: FailedTaskResponse =>
- logger.info(s"Failed to execute ExecTask(${task.getIDInfo})")
- transientStatus(ExecutionNodeStatus.Failed)
- case retry: RetryTaskResponse =>
- logger.warn(s"ExecTask(${task.getIDInfo}) need to retry")
- transientStatus(ExecutionNodeStatus.WaitForRetry)
- }
- } catch {
- case e: Throwable =>
+ override def run(): Unit = {
+ Utils.tryCatch {
+ OrchestratorLoggerUtils.setJobIdMDC(task)
+ logger.info(s"ExecTaskRunner Submit execTask(${task.getIDInfo}) to running")
+ val response = task.execute()
+ this.taskResponse = response
+ response match {
+ case async: AsyncTaskResponse =>
+ transientStatus(ExecutionNodeStatus.Running)
+ case succeed: SucceedTaskResponse =>
+ logger.info(s"Succeed to execute ExecTask(${task.getIDInfo})")
+ transientStatus(ExecutionNodeStatus.Succeed)
+ case failedTaskResponse: FailedTaskResponse =>
+ logger.info(s"Failed to execute ExecTask(${task.getIDInfo})")
+ transientStatus(ExecutionNodeStatus.Failed)
+ case retry: RetryTaskResponse =>
+ logger.warn(s"ExecTask(${task.getIDInfo}) need to retry")
+ transientStatus(ExecutionNodeStatus.WaitForRetry)
+ }
+ } { case e: Throwable =>
logger.error(s"Failed to execute task ${task.getIDInfo}", e)
this.taskResponse = new DefaultFailedTaskResponse(
e.getMessage,
@@ -92,6 +90,8 @@ class AsyncExecTaskRunnerImpl(override val task: ExecTask)
e
)
transientStatus(ExecutionNodeStatus.Failed)
+ }
+ OrchestratorLoggerUtils.removeJobIdMDC()
}
override def transientStatus(status: ExecutionNodeStatus): Unit = {
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/utils/OrchestratorLoggerUtils.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/utils/OrchestratorLoggerUtils.scala
new file mode 100644
index 000000000..e249c675b
--- /dev/null
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/utils/OrchestratorLoggerUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.orchestrator.utils
+
+import org.apache.linkis.governance.common.utils.LoggerUtils
+import org.apache.linkis.orchestrator.plans.physical.ExecTask
+
+object OrchestratorLoggerUtils {
+
+ def setJobIdMDC(task: ExecTask): Unit = {
+ val startUpMap =
+ task.getTaskDesc.getOrigin.getASTOrchestration.getASTContext.getParams.getStartupParams
+ if (null != startUpMap) {
+ LoggerUtils.setJobIdMDC(startUpMap.getConfigurationMap())
+ }
+ }
+
+ def removeJobIdMDC(): Unit = {
+ LoggerUtils.removeJobIdMDC()
+ }
+
+}
diff --git a/linkis-orchestrator/pom.xml b/linkis-orchestrator/pom.xml
index afc94da05..f424c3828 100644
--- a/linkis-orchestrator/pom.xml
+++ b/linkis-orchestrator/pom.xml
@@ -21,6 +21,7 @@
<groupId>org.apache.linkis</groupId>
<artifactId>linkis</artifactId>
<version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<artifactId>linkis-orchestrator</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org