You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/02/21 11:14:04 UTC
[linkis] branch dev-1.3.2 updated: Fix RPC Sender memory leak. close #4251 (#4252)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new afc37d4a8 Fix RPC Sender memory leak. close #4251 (#4252)
afc37d4a8 is described below
commit afc37d4a81c4bf49e011d67767bbfec07bae510c
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Feb 21 19:13:57 2023 +0800
Fix RPC Sender memory leak. close #4251 (#4252)
---
.../main/scala/org/apache/linkis/rpc/Sender.scala | 24 +++++++++++++---------
.../apache/linkis/rpc/conf/RPCConfiguration.scala | 4 ++++
2 files changed, 18 insertions(+), 10 deletions(-)
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/Sender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/Sender.scala
index 1ebdd0ec0..c6af26e6d 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/Sender.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/Sender.scala
@@ -23,9 +23,12 @@ import org.apache.linkis.rpc.conf.RPCConfiguration
import org.apache.linkis.rpc.utils.RPCUtils
import java.util
+import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+
abstract class Sender {
/**
@@ -74,12 +77,20 @@ abstract class Sender {
}
object Sender {
- // TODO needs to consider whether the sender will be a singleton, will there be communication problems?
- // TODO 需要考虑将sender做成单例后,会不会出现通信问题
private val senderFactory: SenderFactory = SenderFactory.getFactory
- private val serviceInstanceToSenders = new util.HashMap[ServiceInstance, Sender]
+ private val serviceInstanceToSenders: LoadingCache[ServiceInstance, Sender] = CacheBuilder
+ .newBuilder()
+ .maximumSize(50000)
+ .expireAfterAccess(RPCConfiguration.SENDER_CACHE_CLEANING_HOUR, TimeUnit.HOURS)
+ .build(new CacheLoader[ServiceInstance, Sender]() {
+
+ override def load(serviceInstance: ServiceInstance): Sender = {
+ senderFactory.createSender(serviceInstance)
+ }
+
+ })
def getSender(applicationName: String): Sender = getSender(ServiceInstance(applicationName, null))
@@ -87,13 +98,6 @@ object Sender {
if (RPCUtils.isPublicService(serviceInstance.getApplicationName)) {
serviceInstance.setApplicationName(RPCConfiguration.PUBLIC_SERVICE_APPLICATION_NAME.getValue)
}
- if (!serviceInstanceToSenders.containsKey(serviceInstance)) {
- serviceInstanceToSenders synchronized {
- if (!serviceInstanceToSenders.containsKey(serviceInstance)) {
- serviceInstanceToSenders.put(serviceInstance, senderFactory.createSender(serviceInstance))
- }
- }
- }
serviceInstanceToSenders.get(serviceInstance)
}
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
index fb3074dc6..32f12da27 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
@@ -98,6 +98,10 @@ object RPCConfiguration {
val ENABLE_SPRING_PARAMS: Boolean =
CommonVars("wds.linkis.rpc.spring.params.enable", false).getValue
+ // unit is HOUR
+ val SENDER_CACHE_CLEANING_HOUR =
+ CommonVars("linkis.rpc.sender.cache.cleaning.time.hour", 6).getValue
+
val REFLECTIONS = new Reflections(
SERVICE_SCAN_PACKAGE,
new MethodAnnotationsScanner(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org