You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/04/06 02:36:54 UTC

[incubator-linkis] 03/03: update consumer idle default value

This is an automated email from the ASF dual-hosted git repository.

alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit a337bc4f3700995f0eccf78de4550060db5a0887
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Apr 4 10:48:03 2022 +0800

    update consumer idle default value
---
 .../org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala | 4 ++--
 .../scheduler/queue/parallelqueue/ParallelConsumerManager.scala   | 8 +++++---
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
index c14bf0f3c..05dba4430 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
@@ -23,9 +23,9 @@ object SchedulerConfiguration {
 
     val FIFO_CONSUMER_AUTO_CLEAR_ENABLED = CommonVars("wds.linkis.fifo.consumer.auto.clear.enabled", true)
 
-    val FIFO_CONSUMER_MAX_IDLE_TIME = CommonVars("wds.linkis.fifo.consumer.max.idle.time", new TimeType("2h")).getValue.toLong
+    val FIFO_CONSUMER_MAX_IDLE_TIME = CommonVars("wds.linkis.fifo.consumer.max.idle.time", new TimeType("1h")).getValue.toLong
 
-    val FIFO_CONSUMER_IDLE_SCAN_INTERVAL = CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new TimeType("6h"))
+    val FIFO_CONSUMER_IDLE_SCAN_INTERVAL = CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new TimeType("2h"))
 
     val FIFO_CONSUMER_IDLE_SCAN_INIT_TIME = CommonVars("wds.linkis.fifo.consumer.idle.scan.init.time", new TimeType("1s"))
 
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
index 889599d9e..c8fb5360f 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
@@ -51,9 +51,11 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String) e
       override def run(): Unit = CONSUMER_LOCK.synchronized {
         info("Start to Clean up idle consumers ")
         val nowTime = System.currentTimeMillis()
-        consumerGroupMap.values.filter(_.isIdle)
-          .filter(consumer => nowTime - consumer.getLastTime > SchedulerConfiguration.FIFO_CONSUMER_MAX_IDLE_TIME)
-          .foreach(consumer => destroyConsumer(consumer.getGroup.getGroupName))
+        Utils.tryAndWarn {
+          consumerGroupMap.values.filter(_.isIdle)
+            .filter(consumer => nowTime - consumer.getLastTime > SchedulerConfiguration.FIFO_CONSUMER_MAX_IDLE_TIME)
+            .foreach(consumer => destroyConsumer(consumer.getGroup.getGroupName))
+        }
         info(s"Finished to clean up idle consumers for $schedulerName, cost ${System.currentTimeMillis() - nowTime} ms.")
       }
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org