You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/04/27 12:09:14 UTC

[incubator-linkis] branch dev-1.1.1 updated: Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same (#2039)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
     new 01d75c9e3 Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same (#2039)
01d75c9e3 is described below

commit 01d75c9e3c5313cd22b1d05dfa024bc1cfdf1001
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Apr 27 20:09:08 2022 +0800

    Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same (#2039)
    
    * Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same
---
 .../cli/heartbeat/CliHeartbeatMonitor.scala        | 82 ++++++++++++----------
 .../entrance/conf/EntranceConfiguration.scala      |  2 +
 2 files changed, 46 insertions(+), 38 deletions(-)

diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
index ded4d0494..03a9d700a 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
@@ -17,18 +17,15 @@
 
 package org.apache.linkis.entrance.cli.heartbeat
 
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, ScheduledThreadPoolExecutor, TimeUnit}
-import javax.annotation.PostConstruct
-
+import org.apache.commons.lang3.concurrent.BasicThreadFactory
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.entrance.conf.EntranceConfiguration
 import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
 import org.apache.linkis.entrance.execute.EntranceJob
 import org.apache.linkis.scheduler.queue.Job
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.concurrent.BasicThreadFactory
 
+import java.util
+import java.util.concurrent.{ConcurrentHashMap, ScheduledThreadPoolExecutor, TimeUnit}
 import scala.collection.JavaConverters._
 
 class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
@@ -38,7 +35,7 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
 
   def panicIfNull(obj: Any, msg: String): Unit = {
     if (obj == null) {
-          throw new EntranceErrorException(EntranceErrorCode.VARIABLE_NULL_EXCEPTION.getErrCode, msg)
+      throw new EntranceErrorException(EntranceErrorCode.VARIABLE_NULL_EXCEPTION.getErrCode, msg)
     }
   }
 
@@ -46,17 +43,18 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
   register for scan
    */
   def registerIfCliJob(job: Job): Unit = {
-    if (job.isInstanceOf[EntranceJob]) {
-      val entranceJob = job.asInstanceOf[EntranceJob]
-      if (isCliJob(entranceJob)) {
-        val id = job.getJobInfo.getId
-        if (infoMap.containsKey(id)) {
-          error("registered duplicate job!! job-id: " + id)
-        } else {
-          infoMap.put(id, entranceJob)
-          info("registered cli job: " + id)
+    job match {
+      case entranceJob: EntranceJob =>
+        if (isCliJob(entranceJob)) {
+          val id = entranceJob.getJobRequest.getId.toString
+          if (infoMap.containsKey(id)) {
+            logger.error(s"registered duplicate job!! job id: $id")
+          } else {
+            infoMap.put(id, entranceJob)
+            logger.info(s"registered cli job id: $id")
+          }
         }
-      }
+      case _ =>
     }
   }
 
@@ -64,12 +62,14 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
   remove from scan list
  */
   def unRegisterIfCliJob(job: Job): Unit = {
-    if (job.isInstanceOf[EntranceJob]) {
-      val entranceJob = job.asInstanceOf[EntranceJob]
-      if (isCliJob(entranceJob)) {
-        infoMap.remove(job.getJobInfo.getId)
-        info("unregistered cli job: " + job.getJobInfo.getId)
-      }
+    job match {
+      case entranceJob: EntranceJob =>
+        if (isCliJob(entranceJob)) {
+          val id = entranceJob.getJobRequest.getId.toString
+          infoMap.remove(id)
+          logger.info(s"unregistered cli job id: $id")
+        }
+      case _ =>
     }
   }
 
@@ -78,24 +78,25 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
   scan only cli jobs
    */
   def updateHeartbeatIfCliJob(job: Job): Unit = {
-    if (job.isInstanceOf[EntranceJob]) {
-      val entranceJob = job.asInstanceOf[EntranceJob]
-      if (isCliJob(entranceJob)) {
-        val id = job.getJobInfo.getId
-        if (!infoMap.containsKey(id)) error("heartbeat on non-existing job!! job-id: " + id)
-        else infoMap.get(id).updateNewestAccessByClientTimestamp()
-      }
+    job match {
+      case entranceJob: EntranceJob =>
+        if (isCliJob(entranceJob)) {
+          val id = entranceJob.getJobRequest.getId.toString
+          if (!infoMap.containsKey(id)) logger.error(s"heartbeat on non-existing job!! job id: $id")
+          else infoMap.get(id).updateNewestAccessByClientTimestamp()
+        }
+      case _ =>
     }
   }
 
   def start(): Unit = {
     panicIfNull(handler, "handler should not be null")
     clientHeartbeatDaemon.scheduleAtFixedRate(new Runnable {
-      override def run(): Unit = Utils.tryCatch(scanOneIteration) {
-        t => error("ClientHeartbeatMonitor failed to scan for one iteration", t)
+      override def run(): Unit = Utils.tryCatch(scanOneIteration()) {
+        t => logger.error("ClientHeartbeatMonitor failed to scan for one iteration", t)
       }
     }, 0, 5, TimeUnit.SECONDS)
-    info("started cliHeartbeatMonitor")
+    logger.info("started cliHeartbeatMonitor")
     Utils.addShutdownHook(() -> this.shutdown())
   }
 
@@ -105,9 +106,9 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
     val problemJobs = new util.ArrayList[EntranceJob]
     while (entries.hasNext) {
       val entry = entries.next
-      debug("Scanned job: " + entry.getKey());
+      logger.debug(s"Scanned job id: ${entry.getKey}");
       if (!isAlive(currentTime, entry.getValue)) {
-        info("Found linkis-cli connection lost job: " + entry.getKey())
+        logger.info(s"Found linkis-cli connection lost job id: ${entry.getKey}")
         problemJobs.add(entry.getValue)
       }
     }
@@ -121,13 +122,18 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
     if (problemJobs.size > 0) {
       handler.handle(problemJobs.asScala.toList)
     }
-    debug("ClientHeartbeatMonitor ends scanning for one iteration")
+    logger.debug("ClientHeartbeatMonitor ends scanning for one iteration")
+  }
+
+  private val monitorCreators = EntranceConfiguration.CLIENT_MONITOR_CREATOR.getValue.split(",")
+
+  private def isCliJob(job: EntranceJob): Boolean = {
+    monitorCreators.exists(job.getCreator.equalsIgnoreCase)
   }
 
-  private def isCliJob(job: EntranceJob): Boolean = StringUtils.equalsIgnoreCase(job.getCreator, "LINKISCLI")
   private def isAlive(currentTime: Long, job: EntranceJob): Boolean = {
     val lastAliveTime = job.getNewestAccessByClientTimestamp
-    return currentTime - lastAliveTime <= clientHeartbeatThreshold
+    (currentTime - lastAliveTime) <= clientHeartbeatThreshold
   }
 
   def shutdown(): Unit = {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 5238c6db9..74f71abb6 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -176,4 +176,6 @@ object EntranceConfiguration {
   val GRORUP_CACHE_MAX = CommonVars("wds.linkis.consumer.group.cache.capacity", 5000)
 
   val GRORUP_CACHE_EXPITE_TIME = CommonVars("wds.linkis.consumer.group.expire.time.hour", 50)
+
+  val CLIENT_MONITOR_CREATOR = CommonVars("wds.linkis.entrance.client.monitor.creator", "LINKISCLI")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org