You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/04/20 08:11:17 UTC
[incubator-linkis] 03/04: Code optimization, optimized synchronization lock #2015
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 5da06151dd7ed75037017fa9a005076f0c9a7ed7
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Apr 20 14:57:09 2022 +0800
Code optimization, optimized synchronization lock #2015
---
.../computation/executor/execute/ComputationExecutor.scala | 4 +++-
.../computation/executor/service/TaskExecutionServiceImpl.scala | 8 +++++---
.../linkis/engineplugin/spark/executor/SparkScalaExecutor.scala | 2 +-
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
index 3491cb9d3..1d14e4821 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
@@ -76,6 +76,8 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) extends Acc
private val MAX_TASK_EXECUTE_NUM = ComputationExecutorConf.ENGINE_MAX_TASK_EXECUTE_NUM.getValue
+ private val CLOSE_LOCKER = new Object
+
protected def setInitialized(inited: Boolean = true): Unit = this.engineInitialized = inited
final override def tryReady(): Boolean = {
@@ -120,7 +122,7 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) extends Acc
protected def callback(): Unit = {}
override def close(): Unit = {
- if (null != lastTask) synchronized {
+ if (null != lastTask) CLOSE_LOCKER.synchronized {
killTask(lastTask.getTaskId)
} else {
killTask("By close")
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 1d1575f0d..6e927867f 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -83,13 +83,15 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
lazy private val cachedThreadPool = Utils.newCachedThreadPool(ComputationExecutorConf.ENGINE_CONCURRENT_THREAD_NUM.getValue,
"ConcurrentEngineConnThreadPool")
+ private val CONCURRENT_TASK_LOCKER = new Object
+
@PostConstruct
def init(): Unit = {
LogHelper.setLogListener(this)
syncListenerBus.addListener(this)
}
- private def sendToEntrance(task: EngineConnTask, msg: RequestProtocol): Unit = synchronized {
+ private def sendToEntrance(task: EngineConnTask, msg: RequestProtocol): Unit = {
Utils.tryCatch {
var sender: Sender = null
if (null != task && null != task.getCallbackServiceInstance() && null != msg) {
@@ -234,13 +236,13 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
}
private def submitConcurrentTask(task: CommonEngineConnTask, executor: ConcurrentComputationExecutor): ExecuteResponse = {
- if (null == concurrentTaskQueue) synchronized {
+ if (null == concurrentTaskQueue) CONCURRENT_TASK_LOCKER.synchronized {
if (null == concurrentTaskQueue) {
concurrentTaskQueue = new LinkedBlockingDeque[EngineConnTask]()
}
}
concurrentTaskQueue.put(task)
- if (null == consumerThread) synchronized {
+ if (null == consumerThread) CONCURRENT_TASK_LOCKER.synchronized {
if (null == consumerThread) {
consumerThread = new Thread(createConsumerRunnable(executor))
consumerThread.setDaemon(true)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
index 55701ccfa..938a0935a 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
@@ -147,7 +147,7 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long) exten
res
}
- def executeLine(code: String, engineExecutionContext: EngineExecutionContext): ExecuteResponse = synchronized {
+ def executeLine(code: String, engineExecutionContext: EngineExecutionContext): ExecuteResponse = {
if(sparkContext.isStopped) {
error("Spark application has already stopped, please restart it.")
throw new ApplicationAlreadyStoppedException(40004,"Spark application has already stopped, please restart it.")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org