You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/04/06 02:36:52 UTC

[incubator-linkis] 01/03: The group cache in EntranceFactory is replaced with guava cache #1901

This is an automated email from the ASF dual-hosted git repository.

alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit e3b8d34ba0b82727db0261cc4a15a44f5c0c8517
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Apr 4 10:46:41 2022 +0800

    The group cache in EntranceFactory is replaced with guava cache #1901
---
 .../entrance/conf/EntranceConfiguration.scala      |  6 ++-
 .../entrance/scheduler/EntranceGroupFactory.scala  | 48 +++++++++++-----------
 2 files changed, 30 insertions(+), 24 deletions(-)

diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index df4ac7d01..916bcc8a3 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -83,7 +83,7 @@ object EntranceConfiguration {
     * wds.linkis.dwc.instance is a parameter used to control the number of engines each user starts.
     *wds.linkis.instance 是用来进行控制每个用户启动engine数量的参数
     */
-  val WDS_LINKIS_INSTANCE = CommonVars("wds.linkis.rm.instance", 3)
+  val WDS_LINKIS_INSTANCE = CommonVars("wds.linkis.rm.instance", 10)
 
   val LOG_EXCLUDE_ALL = CommonVars("wds.linkis.log.exclude.all", "com.netflix")
 
@@ -172,4 +172,8 @@ object EntranceConfiguration {
   val CLI_HEARTBEAT_THRESHOLD_SECONDS = CommonVars[Long] ("linkis.entrance.cli.heartbeat.threshold.sec", 30l).getValue
 
   val LOG_PUSH_INTERVAL_TIME = CommonVars("wds.linkis.entrance.log.push.interval.time", 5 * 60 * 1000)
+
+  val GRORUP_CACHE_MAX = CommonVars("wds.linkis.consumer.group.cache.capacity", 5000)
+
+  val GRORUP_CACHE_EXPITE_TIME = CommonVars("wds.linkis.consumer.group.expire.time.hour", 50)
 }
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
index 6e925711f..1db867b3f 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
@@ -17,33 +17,36 @@
  
 package org.apache.linkis.entrance.scheduler
 
-import java.util
-
+import com.google.common.cache.{Cache, CacheBuilder}
+import org.apache.commons.lang.StringUtils
 import org.apache.linkis.common.conf.{CommonVars, Configuration}
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.entrance.conf.EntranceConfiguration
 import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
 import org.apache.linkis.entrance.execute.EntranceJob
-import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig}
+import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig}
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{ConcurrentEngineConnLabel, EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.protocol.constants.TaskConstant
 import org.apache.linkis.protocol.utils.TaskUtils
 import org.apache.linkis.rpc.Sender
 import org.apache.linkis.scheduler.queue.parallelqueue.ParallelGroup
 import org.apache.linkis.scheduler.queue.{Group, GroupFactory, SchedulerEvent}
-import org.apache.linkis.server.JMap
-import org.apache.commons.lang.StringUtils
 
+import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.JavaConversions._
 
 
 class EntranceGroupFactory extends GroupFactory with Logging {
 
-  private val groupNameToGroups = new JMap[String, Group]
-//  private val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
+
+  private val groupNameToGroups: Cache[String, Group] = CacheBuilder.newBuilder().expireAfterAccess(EntranceConfiguration.GRORUP_CACHE_EXPITE_TIME.getValue, TimeUnit.MINUTES)
+    .maximumSize(EntranceConfiguration.GRORUP_CACHE_MAX.getValue).build()
 
   private val GROUP_MAX_CAPACITY = CommonVars("wds.linkis.entrance.max.capacity", 2000)
+
   private val GROUP_INIT_CAPACITY = CommonVars("wds.linkis.entrance.init.capacity", 100)
 
 
@@ -53,11 +56,12 @@ class EntranceGroupFactory extends GroupFactory with Logging {
         (job.getJobRequest.getLabels, job.getJobRequest.getParams.asInstanceOf[util.Map[String, Any]])
     }
     val groupName = EntranceGroupFactory.getGroupNameByLabels(labels, params)
-    if (!groupNameToGroups.containsKey(groupName)) synchronized {
+    val cacheGroup = groupNameToGroups.getIfPresent(groupName)
+    if (null == cacheGroup) synchronized {
       val maxAskExecutorTimes = EntranceConfiguration.MAX_ASK_EXECUTOR_TIME.getValue.toLong
       if (groupName.startsWith(EntranceGroupFactory.CONCURRENT)) {
-        if (null == groupNameToGroups.get(groupName)) synchronized {
-          if (null == groupNameToGroups.get(groupName)) {
+        if (null == groupNameToGroups.getIfPresent(groupName)) synchronized {
+          if (null == groupNameToGroups.getIfPresent(groupName)) {
             val group = new ParallelGroup(groupName, 100, EntranceConfiguration.CONCURRENT_FACTORY_MAX_CAPACITY.getValue)
             group.setMaxRunningJobs(EntranceConfiguration.CONCURRENT_MAX_RUNNING_JOBS.getValue)
             group.setMaxAskExecutorTimes(EntranceConfiguration.CONCURRENT_EXECUTOR_TIME.getValue)
@@ -67,36 +71,34 @@ class EntranceGroupFactory extends GroupFactory with Logging {
         }
       }
       val sender: Sender = Sender.getSender(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue)
-      var userCreatorLabel: UserCreatorLabel = null
-      var engineTypeLabel: EngineTypeLabel = null
-      labels.foreach {
-        case label: UserCreatorLabel => userCreatorLabel = label
-        case label: EngineTypeLabel => engineTypeLabel = label
-        case _ =>
-      }
-      info(s"Getting user configurations for $groupName(正在为 $groupName 获取参数) userCreatorLabel: ${userCreatorLabel.getStringValue}, engineTypeLabel:${engineTypeLabel.getStringValue}.")
+      val userCreatorLabel: UserCreatorLabel = LabelUtil.getUserCreatorLabel(labels)
+      val engineTypeLabel: EngineTypeLabel = LabelUtil.getEngineTypeLabel(labels)
+      logger.info(s"Getting user configurations for $groupName userCreatorLabel: ${userCreatorLabel.getStringValue}, engineTypeLabel:${engineTypeLabel.getStringValue}.")
       val keyAndValue = Utils.tryAndWarnMsg {
         sender.ask(RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue
       }("Get user configurations from configuration server failed! Next use the default value to continue.")
       val maxRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue)
       val initCapacity = GROUP_INIT_CAPACITY.getValue(keyAndValue)
       val maxCapacity = GROUP_MAX_CAPACITY.getValue(keyAndValue)
-      info(s"Got user configurations: groupName=$groupName, maxRunningJobs=$maxRunningJobs, initCapacity=$initCapacity, maxCapacity=$maxCapacity.")
+      logger.info(s"Got user configurations: groupName=$groupName, maxRunningJobs=$maxRunningJobs, initCapacity=$initCapacity, maxCapacity=$maxCapacity.")
       val group = new ParallelGroup(groupName, initCapacity, maxCapacity)
       group.setMaxRunningJobs(maxRunningJobs)
       group.setMaxAskExecutorTimes(maxAskExecutorTimes)
-      if (!groupNameToGroups.containsKey(groupName)) groupNameToGroups.put(groupName, group)
+      groupNameToGroups.put(groupName, group)
     }
-    groupNameToGroups.get(groupName)
+    groupNameToGroups.getIfPresent(groupName)
   }
 
   override def getGroup(groupName: String): Group = {
-    val group = groupNameToGroups.get(groupName)
-    if(group == null){
+    val group = groupNameToGroups.getIfPresent(groupName)
+    if (group == null) {
       throw new EntranceErrorException(EntranceErrorCode.GROUP_NOT_FOUND.getErrCode, s"group not found: ${groupName}")
     }
     group
   }
+
+
+
 }
 
 object EntranceGroupFactory {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org