You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/06/07 12:34:15 UTC
[incubator-linkis] branch dev-1.1.3 updated: linkis-entranc - fix JobQueueLabel and jobRunningLabel (#2237)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.3 by this push:
new adfc90243 linkis-entranc - fix JobQueueLabel and jobRunningLabel (#2237)
adfc90243 is described below
commit adfc90243796286043b52c82a2ea58080187d37a
Author: Alexyang <xu...@qq.com>
AuthorDate: Tue Jun 7 20:34:11 2022 +0800
linkis-entranc - fix JobQueueLabel and jobRunningLabel (#2237)
---
.../linkis/scheduler/queue/SchedulerEvent.scala | 6 ++++++
.../entrance/timeout/JobTimeoutManager.scala | 25 ++++++++++++----------
2 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
index ede98987c..d15ba840a 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
@@ -33,6 +33,12 @@ trait SchedulerEvent extends Logging {
def getEndTime = endTime
def getStartTime = startTime
+
+ /*
+ * To be compatible with old versions.
+ * It's not recommonded to use scheduledTime, which was only several mills at most time.
+ */
+ @Deprecated
def getScheduledTime = scheduledTime
def getId = id
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
index 3c14ec211..b9ec38941 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
@@ -75,21 +75,24 @@ class JobTimeoutManager extends Logging {
private def timeoutDetective(): Unit = {
if (timeoutCheck) {
def checkAndSwitch(job: EntranceJob): Unit = {
- info(s"Checking whether the job timed out: ${job.getId()}")
- val currentTime = System.currentTimeMillis() / 1000
- val queuingTime = currentTime - job.getScheduledTime / 1000
- val runningTime = currentTime - job.getStartTime / 1000
+ info(s"Checking whether the job id ${job.getJobRequest.getId()} timed out. ")
+ val currentTimeSeconds = System.currentTimeMillis() / 1000
+ // job.isWaiting == job in queue
+ val jobScheduleStartTimeSeconds = if (job.isWaiting) job.createTime / 1000 else currentTimeSeconds
+ val queuingTimeSeconds = currentTimeSeconds - jobScheduleStartTimeSeconds
+ val jobRunningStartTimeSeconds = if (job.getStartTime > 0) job.getStartTime / 1000 else currentTimeSeconds
+ val runningTimeSeconds = currentTimeSeconds - jobRunningStartTimeSeconds
if (!job.isCompleted) {
job.jobRequest.getLabels foreach {
case queueTimeOutLabel: JobQueuingTimeoutLabel =>
- if (queueTimeOutLabel.getQueuingTimeout > 0 && queuingTime >= queueTimeOutLabel.getQueuingTimeout) {
- warn(s"Job queuing timeout, cancel it now: ${job.getId()}")
- job.cancel()
+ if (job.isWaiting && queueTimeOutLabel.getQueuingTimeout > 0 && queuingTimeSeconds >= queueTimeOutLabel.getQueuingTimeout) {
+ logger.warn(s"Job ${job.getJobRequest.getId()} queued time : ${queuingTimeSeconds} seconds, which was over queueTimeOut : ${queueTimeOutLabel.getQueuingTimeout} seconds, cancel it now! ")
+ job.onFailure(s"Job queued ${queuingTimeSeconds} seconds over max queue time : ${queueTimeOutLabel.getQueuingTimeout} seconds.", null)
}
case jobRunningTimeoutLabel: JobRunningTimeoutLabel =>
- if (jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTime >= jobRunningTimeoutLabel.getRunningTimeout) {
- warn(s"Job running timeout, cancel it now: ${job.getId()}")
- job.cancel()
+ if (job.isRunning && jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTimeSeconds >= jobRunningTimeoutLabel.getRunningTimeout) {
+ logger.warn(s"Job ${job.getJobRequest.getId()} run timeout ${runningTimeSeconds} seconds, which was over runTimeOut : ${jobRunningTimeoutLabel.getRunningTimeout} seconds, cancel it now! ")
+ job.onFailure(s"Job run ${runningTimeSeconds} seconds over max run time : ${jobRunningTimeoutLabel.getRunningTimeout} seconds.", null)
}
case _ =>
}
@@ -97,7 +100,7 @@ class JobTimeoutManager extends Logging {
}
timeoutJobByName.foreach(item => {
- info(s"Running timeout detection!")
+ logger.info(s"Running timeout detection!")
synchronized {
jobCompleteDelete(item._1)
if (jobExist(item._1)) checkAndSwitch(item._2)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org