You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/06/07 12:34:15 UTC

[incubator-linkis] branch dev-1.1.3 updated: linkis-entranc - fix JobQueueLabel and jobRunningLabel (#2237)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.3 by this push:
     new adfc90243 linkis-entranc - fix JobQueueLabel and jobRunningLabel (#2237)
adfc90243 is described below

commit adfc90243796286043b52c82a2ea58080187d37a
Author: Alexyang <xu...@qq.com>
AuthorDate: Tue Jun 7 20:34:11 2022 +0800

    linkis-entranc - fix JobQueueLabel and jobRunningLabel (#2237)
---
 .../linkis/scheduler/queue/SchedulerEvent.scala    |  6 ++++++
 .../entrance/timeout/JobTimeoutManager.scala       | 25 ++++++++++++----------
 2 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
index ede98987c..d15ba840a 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
@@ -33,6 +33,12 @@ trait SchedulerEvent extends Logging {
 
   def getEndTime = endTime
   def getStartTime = startTime
+
+  /*
+   * To be compatible with old versions.
+   * It's not recommonded to use scheduledTime, which was only several mills at most time.
+   */
+  @Deprecated
   def getScheduledTime = scheduledTime
 
   def getId = id
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
index 3c14ec211..b9ec38941 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
@@ -75,21 +75,24 @@ class JobTimeoutManager extends Logging {
   private def timeoutDetective(): Unit = {
     if (timeoutCheck) {
       def checkAndSwitch(job: EntranceJob): Unit = {
-        info(s"Checking whether the job timed out: ${job.getId()}")
-        val currentTime = System.currentTimeMillis() / 1000
-        val queuingTime = currentTime - job.getScheduledTime / 1000
-        val runningTime = currentTime - job.getStartTime / 1000
+        info(s"Checking whether the job id ${job.getJobRequest.getId()} timed out. ")
+        val currentTimeSeconds = System.currentTimeMillis() / 1000
+        // job.isWaiting == job in queue
+        val jobScheduleStartTimeSeconds = if (job.isWaiting) job.createTime / 1000 else currentTimeSeconds
+        val queuingTimeSeconds = currentTimeSeconds - jobScheduleStartTimeSeconds
+        val jobRunningStartTimeSeconds = if (job.getStartTime > 0) job.getStartTime / 1000 else currentTimeSeconds
+        val runningTimeSeconds = currentTimeSeconds - jobRunningStartTimeSeconds
         if (!job.isCompleted) {
           job.jobRequest.getLabels foreach {
             case queueTimeOutLabel: JobQueuingTimeoutLabel =>
-              if (queueTimeOutLabel.getQueuingTimeout > 0 && queuingTime >= queueTimeOutLabel.getQueuingTimeout) {
-                warn(s"Job queuing timeout, cancel it now: ${job.getId()}")
-                job.cancel()
+              if (job.isWaiting && queueTimeOutLabel.getQueuingTimeout > 0 && queuingTimeSeconds >= queueTimeOutLabel.getQueuingTimeout) {
+                logger.warn(s"Job ${job.getJobRequest.getId()} queued time : ${queuingTimeSeconds} seconds, which was over queueTimeOut : ${queueTimeOutLabel.getQueuingTimeout} seconds, cancel it now! ")
+                job.onFailure(s"Job queued ${queuingTimeSeconds} seconds over max queue time : ${queueTimeOutLabel.getQueuingTimeout} seconds.", null)
               }
             case jobRunningTimeoutLabel: JobRunningTimeoutLabel =>
-              if (jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTime >= jobRunningTimeoutLabel.getRunningTimeout) {
-                warn(s"Job running timeout, cancel it now: ${job.getId()}")
-                job.cancel()
+              if (job.isRunning && jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTimeSeconds >= jobRunningTimeoutLabel.getRunningTimeout) {
+                logger.warn(s"Job ${job.getJobRequest.getId()} run timeout ${runningTimeSeconds} seconds, which was over runTimeOut : ${jobRunningTimeoutLabel.getRunningTimeout} seconds, cancel it now! ")
+                job.onFailure(s"Job run ${runningTimeSeconds} seconds over max run time : ${jobRunningTimeoutLabel.getRunningTimeout} seconds.", null)
               }
             case _ =>
           }
@@ -97,7 +100,7 @@ class JobTimeoutManager extends Logging {
       }
 
       timeoutJobByName.foreach(item => {
-        info(s"Running timeout detection!")
+        logger.info(s"Running timeout detection!")
         synchronized {
           jobCompleteDelete(item._1)
           if (jobExist(item._1)) checkAndSwitch(item._2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org