You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/04/27 12:09:14 UTC
[incubator-linkis] branch dev-1.1.1 updated: Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same (#2039)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new 01d75c9e3 Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same (#2039)
01d75c9e3 is described below
commit 01d75c9e3c5313cd22b1d05dfa024bc1cfdf1001
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Apr 27 20:09:08 2022 +0800
Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same (#2039)
* Fix the issue that the keys of put and remove in Map used in CliHeartbeatMonitor are not the same
---
.../cli/heartbeat/CliHeartbeatMonitor.scala | 82 ++++++++++++----------
.../entrance/conf/EntranceConfiguration.scala | 2 +
2 files changed, 46 insertions(+), 38 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
index ded4d0494..03a9d700a 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
@@ -17,18 +17,15 @@
package org.apache.linkis.entrance.cli.heartbeat
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, ScheduledThreadPoolExecutor, TimeUnit}
-import javax.annotation.PostConstruct
-
+import org.apache.commons.lang3.concurrent.BasicThreadFactory
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
import org.apache.linkis.entrance.execute.EntranceJob
import org.apache.linkis.scheduler.queue.Job
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.concurrent.BasicThreadFactory
+import java.util
+import java.util.concurrent.{ConcurrentHashMap, ScheduledThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
@@ -38,7 +35,7 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
def panicIfNull(obj: Any, msg: String): Unit = {
if (obj == null) {
- throw new EntranceErrorException(EntranceErrorCode.VARIABLE_NULL_EXCEPTION.getErrCode, msg)
+ throw new EntranceErrorException(EntranceErrorCode.VARIABLE_NULL_EXCEPTION.getErrCode, msg)
}
}
@@ -46,17 +43,18 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
register for scan
*/
def registerIfCliJob(job: Job): Unit = {
- if (job.isInstanceOf[EntranceJob]) {
- val entranceJob = job.asInstanceOf[EntranceJob]
- if (isCliJob(entranceJob)) {
- val id = job.getJobInfo.getId
- if (infoMap.containsKey(id)) {
- error("registered duplicate job!! job-id: " + id)
- } else {
- infoMap.put(id, entranceJob)
- info("registered cli job: " + id)
+ job match {
+ case entranceJob: EntranceJob =>
+ if (isCliJob(entranceJob)) {
+ val id = entranceJob.getJobRequest.getId.toString
+ if (infoMap.containsKey(id)) {
+ logger.error(s"registered duplicate job!! job id: $id")
+ } else {
+ infoMap.put(id, entranceJob)
+ logger.info(s"registered cli job id: $id")
+ }
}
- }
+ case _ =>
}
}
@@ -64,12 +62,14 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
remove from scan list
*/
def unRegisterIfCliJob(job: Job): Unit = {
- if (job.isInstanceOf[EntranceJob]) {
- val entranceJob = job.asInstanceOf[EntranceJob]
- if (isCliJob(entranceJob)) {
- infoMap.remove(job.getJobInfo.getId)
- info("unregistered cli job: " + job.getJobInfo.getId)
- }
+ job match {
+ case entranceJob: EntranceJob =>
+ if (isCliJob(entranceJob)) {
+ val id = entranceJob.getJobRequest.getId.toString
+ infoMap.remove(id)
+ logger.info(s"unregistered cli job id: $id")
+ }
+ case _ =>
}
}
@@ -78,24 +78,25 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
scan only cli jobs
*/
def updateHeartbeatIfCliJob(job: Job): Unit = {
- if (job.isInstanceOf[EntranceJob]) {
- val entranceJob = job.asInstanceOf[EntranceJob]
- if (isCliJob(entranceJob)) {
- val id = job.getJobInfo.getId
- if (!infoMap.containsKey(id)) error("heartbeat on non-existing job!! job-id: " + id)
- else infoMap.get(id).updateNewestAccessByClientTimestamp()
- }
+ job match {
+ case entranceJob: EntranceJob =>
+ if (isCliJob(entranceJob)) {
+ val id = entranceJob.getJobRequest.getId.toString
+ if (!infoMap.containsKey(id)) logger.error(s"heartbeat on non-existing job!! job id: $id")
+ else infoMap.get(id).updateNewestAccessByClientTimestamp()
+ }
+ case _ =>
}
}
def start(): Unit = {
panicIfNull(handler, "handler should not be null")
clientHeartbeatDaemon.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryCatch(scanOneIteration) {
- t => error("ClientHeartbeatMonitor failed to scan for one iteration", t)
+ override def run(): Unit = Utils.tryCatch(scanOneIteration()) {
+ t => logger.error("ClientHeartbeatMonitor failed to scan for one iteration", t)
}
}, 0, 5, TimeUnit.SECONDS)
- info("started cliHeartbeatMonitor")
+ logger.info("started cliHeartbeatMonitor")
Utils.addShutdownHook(() -> this.shutdown())
}
@@ -105,9 +106,9 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
val problemJobs = new util.ArrayList[EntranceJob]
while (entries.hasNext) {
val entry = entries.next
- debug("Scanned job: " + entry.getKey());
+ logger.debug(s"Scanned job id: ${entry.getKey}");
if (!isAlive(currentTime, entry.getValue)) {
- info("Found linkis-cli connection lost job: " + entry.getKey())
+ logger.info(s"Found linkis-cli connection lost job id: ${entry.getKey}")
problemJobs.add(entry.getValue)
}
}
@@ -121,13 +122,18 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) extends Logging {
if (problemJobs.size > 0) {
handler.handle(problemJobs.asScala.toList)
}
- debug("ClientHeartbeatMonitor ends scanning for one iteration")
+ logger.debug("ClientHeartbeatMonitor ends scanning for one iteration")
+ }
+
+ private val monitorCreators = EntranceConfiguration.CLIENT_MONITOR_CREATOR.getValue.split(",")
+
+ private def isCliJob(job: EntranceJob): Boolean = {
+ monitorCreators.exists(job.getCreator.equalsIgnoreCase)
}
- private def isCliJob(job: EntranceJob): Boolean = StringUtils.equalsIgnoreCase(job.getCreator, "LINKISCLI")
private def isAlive(currentTime: Long, job: EntranceJob): Boolean = {
val lastAliveTime = job.getNewestAccessByClientTimestamp
- return currentTime - lastAliveTime <= clientHeartbeatThreshold
+ (currentTime - lastAliveTime) <= clientHeartbeatThreshold
}
def shutdown(): Unit = {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 5238c6db9..74f71abb6 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -176,4 +176,6 @@ object EntranceConfiguration {
val GRORUP_CACHE_MAX = CommonVars("wds.linkis.consumer.group.cache.capacity", 5000)
val GRORUP_CACHE_EXPITE_TIME = CommonVars("wds.linkis.consumer.group.expire.time.hour", 50)
+
+ val CLIENT_MONITOR_CREATOR = CommonVars("wds.linkis.entrance.client.monitor.creator", "LINKISCLI")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org