You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/04/06 02:36:52 UTC
[incubator-linkis] 01/03: The group cache in EntranceFactory is replaced with guava cache #1901
This is an automated email from the ASF dual-hosted git repository.
alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit e3b8d34ba0b82727db0261cc4a15a44f5c0c8517
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Apr 4 10:46:41 2022 +0800
The group cache in EntranceFactory is replaced with guava cache #1901
---
.../entrance/conf/EntranceConfiguration.scala | 6 ++-
.../entrance/scheduler/EntranceGroupFactory.scala | 48 +++++++++++-----------
2 files changed, 30 insertions(+), 24 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index df4ac7d01..916bcc8a3 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -83,7 +83,7 @@ object EntranceConfiguration {
* wds.linkis.dwc.instance is a parameter used to control the number of engines each user starts.
*wds.linkis.instance 是用来进行控制每个用户启动engine数量的参数
*/
- val WDS_LINKIS_INSTANCE = CommonVars("wds.linkis.rm.instance", 3)
+ val WDS_LINKIS_INSTANCE = CommonVars("wds.linkis.rm.instance", 10)
val LOG_EXCLUDE_ALL = CommonVars("wds.linkis.log.exclude.all", "com.netflix")
@@ -172,4 +172,8 @@ object EntranceConfiguration {
val CLI_HEARTBEAT_THRESHOLD_SECONDS = CommonVars[Long] ("linkis.entrance.cli.heartbeat.threshold.sec", 30l).getValue
val LOG_PUSH_INTERVAL_TIME = CommonVars("wds.linkis.entrance.log.push.interval.time", 5 * 60 * 1000)
+
+ val GRORUP_CACHE_MAX = CommonVars("wds.linkis.consumer.group.cache.capacity", 5000)
+
+ val GRORUP_CACHE_EXPITE_TIME = CommonVars("wds.linkis.consumer.group.expire.time.hour", 50)
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
index 6e925711f..1db867b3f 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
@@ -17,33 +17,36 @@
package org.apache.linkis.entrance.scheduler
-import java.util
-
+import com.google.common.cache.{Cache, CacheBuilder}
+import org.apache.commons.lang.StringUtils
import org.apache.linkis.common.conf.{CommonVars, Configuration}
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
import org.apache.linkis.entrance.execute.EntranceJob
-import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig}
+import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig}
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{ConcurrentEngineConnLabel, EngineTypeLabel, UserCreatorLabel}
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.queue.parallelqueue.ParallelGroup
import org.apache.linkis.scheduler.queue.{Group, GroupFactory, SchedulerEvent}
-import org.apache.linkis.server.JMap
-import org.apache.commons.lang.StringUtils
+import java.util
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
class EntranceGroupFactory extends GroupFactory with Logging {
- private val groupNameToGroups = new JMap[String, Group]
-// private val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
+
+ private val groupNameToGroups: Cache[String, Group] = CacheBuilder.newBuilder().expireAfterAccess(EntranceConfiguration.GRORUP_CACHE_EXPITE_TIME.getValue, TimeUnit.MINUTES)
+ .maximumSize(EntranceConfiguration.GRORUP_CACHE_MAX.getValue).build()
private val GROUP_MAX_CAPACITY = CommonVars("wds.linkis.entrance.max.capacity", 2000)
+
private val GROUP_INIT_CAPACITY = CommonVars("wds.linkis.entrance.init.capacity", 100)
@@ -53,11 +56,12 @@ class EntranceGroupFactory extends GroupFactory with Logging {
(job.getJobRequest.getLabels, job.getJobRequest.getParams.asInstanceOf[util.Map[String, Any]])
}
val groupName = EntranceGroupFactory.getGroupNameByLabels(labels, params)
- if (!groupNameToGroups.containsKey(groupName)) synchronized {
+ val cacheGroup = groupNameToGroups.getIfPresent(groupName)
+ if (null == cacheGroup) synchronized {
val maxAskExecutorTimes = EntranceConfiguration.MAX_ASK_EXECUTOR_TIME.getValue.toLong
if (groupName.startsWith(EntranceGroupFactory.CONCURRENT)) {
- if (null == groupNameToGroups.get(groupName)) synchronized {
- if (null == groupNameToGroups.get(groupName)) {
+ if (null == groupNameToGroups.getIfPresent(groupName)) synchronized {
+ if (null == groupNameToGroups.getIfPresent(groupName)) {
val group = new ParallelGroup(groupName, 100, EntranceConfiguration.CONCURRENT_FACTORY_MAX_CAPACITY.getValue)
group.setMaxRunningJobs(EntranceConfiguration.CONCURRENT_MAX_RUNNING_JOBS.getValue)
group.setMaxAskExecutorTimes(EntranceConfiguration.CONCURRENT_EXECUTOR_TIME.getValue)
@@ -67,36 +71,34 @@ class EntranceGroupFactory extends GroupFactory with Logging {
}
}
val sender: Sender = Sender.getSender(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue)
- var userCreatorLabel: UserCreatorLabel = null
- var engineTypeLabel: EngineTypeLabel = null
- labels.foreach {
- case label: UserCreatorLabel => userCreatorLabel = label
- case label: EngineTypeLabel => engineTypeLabel = label
- case _ =>
- }
- info(s"Getting user configurations for $groupName(正在为 $groupName 获取参数) userCreatorLabel: ${userCreatorLabel.getStringValue}, engineTypeLabel:${engineTypeLabel.getStringValue}.")
+ val userCreatorLabel: UserCreatorLabel = LabelUtil.getUserCreatorLabel(labels)
+ val engineTypeLabel: EngineTypeLabel = LabelUtil.getEngineTypeLabel(labels)
+ logger.info(s"Getting user configurations for $groupName userCreatorLabel: ${userCreatorLabel.getStringValue}, engineTypeLabel:${engineTypeLabel.getStringValue}.")
val keyAndValue = Utils.tryAndWarnMsg {
sender.ask(RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue
}("Get user configurations from configuration server failed! Next use the default value to continue.")
val maxRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue)
val initCapacity = GROUP_INIT_CAPACITY.getValue(keyAndValue)
val maxCapacity = GROUP_MAX_CAPACITY.getValue(keyAndValue)
- info(s"Got user configurations: groupName=$groupName, maxRunningJobs=$maxRunningJobs, initCapacity=$initCapacity, maxCapacity=$maxCapacity.")
+ logger.info(s"Got user configurations: groupName=$groupName, maxRunningJobs=$maxRunningJobs, initCapacity=$initCapacity, maxCapacity=$maxCapacity.")
val group = new ParallelGroup(groupName, initCapacity, maxCapacity)
group.setMaxRunningJobs(maxRunningJobs)
group.setMaxAskExecutorTimes(maxAskExecutorTimes)
- if (!groupNameToGroups.containsKey(groupName)) groupNameToGroups.put(groupName, group)
+ groupNameToGroups.put(groupName, group)
}
- groupNameToGroups.get(groupName)
+ groupNameToGroups.getIfPresent(groupName)
}
override def getGroup(groupName: String): Group = {
- val group = groupNameToGroups.get(groupName)
- if(group == null){
+ val group = groupNameToGroups.getIfPresent(groupName)
+ if (group == null) {
throw new EntranceErrorException(EntranceErrorCode.GROUP_NOT_FOUND.getErrCode, s"group not found: ${groupName}")
}
group
}
+
+
+
}
object EntranceGroupFactory {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org