You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/04/20 08:11:17 UTC

[incubator-linkis] 03/04: Code optimization, optimized synchronization lock #2015

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 5da06151dd7ed75037017fa9a005076f0c9a7ed7
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Apr 20 14:57:09 2022 +0800

    Code optimization, optimized synchronization lock #2015
---
 .../computation/executor/execute/ComputationExecutor.scala        | 4 +++-
 .../computation/executor/service/TaskExecutionServiceImpl.scala   | 8 +++++---
 .../linkis/engineplugin/spark/executor/SparkScalaExecutor.scala   | 2 +-
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
index 3491cb9d3..1d14e4821 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
@@ -76,6 +76,8 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) extends Acc
 
   private val MAX_TASK_EXECUTE_NUM = ComputationExecutorConf.ENGINE_MAX_TASK_EXECUTE_NUM.getValue
 
+  private val CLOSE_LOCKER = new Object
+
   protected def setInitialized(inited: Boolean = true): Unit = this.engineInitialized = inited
 
   final override def tryReady(): Boolean = {
@@ -120,7 +122,7 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) extends Acc
   protected def callback(): Unit = {}
 
   override def close(): Unit = {
-    if (null != lastTask) synchronized {
+    if (null != lastTask) CLOSE_LOCKER.synchronized {
       killTask(lastTask.getTaskId)
     } else {
       killTask("By close")
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 1d1575f0d..6e927867f 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -83,13 +83,15 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
   lazy private val cachedThreadPool = Utils.newCachedThreadPool(ComputationExecutorConf.ENGINE_CONCURRENT_THREAD_NUM.getValue,
     "ConcurrentEngineConnThreadPool")
 
+  private val CONCURRENT_TASK_LOCKER = new Object
+
   @PostConstruct
   def init(): Unit = {
     LogHelper.setLogListener(this)
     syncListenerBus.addListener(this)
   }
 
-  private def sendToEntrance(task: EngineConnTask, msg: RequestProtocol): Unit = synchronized {
+  private def sendToEntrance(task: EngineConnTask, msg: RequestProtocol): Unit = {
     Utils.tryCatch {
       var sender: Sender = null
       if (null != task && null != task.getCallbackServiceInstance() && null != msg) {
@@ -234,13 +236,13 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
   }
 
   private def submitConcurrentTask(task: CommonEngineConnTask, executor: ConcurrentComputationExecutor): ExecuteResponse = {
-    if (null == concurrentTaskQueue) synchronized {
+    if (null == concurrentTaskQueue) CONCURRENT_TASK_LOCKER.synchronized {
       if (null == concurrentTaskQueue) {
         concurrentTaskQueue = new LinkedBlockingDeque[EngineConnTask]()
       }
     }
     concurrentTaskQueue.put(task)
-    if (null == consumerThread) synchronized {
+    if (null == consumerThread) CONCURRENT_TASK_LOCKER.synchronized {
       if (null == consumerThread) {
         consumerThread = new Thread(createConsumerRunnable(executor))
         consumerThread.setDaemon(true)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
index 55701ccfa..938a0935a 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
@@ -147,7 +147,7 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long) exten
 
     res
   }
-  def executeLine(code: String, engineExecutionContext: EngineExecutionContext): ExecuteResponse = synchronized {
+  def executeLine(code: String, engineExecutionContext: EngineExecutionContext): ExecuteResponse = {
     if(sparkContext.isStopped) {
       error("Spark application has already stopped, please restart it.")
       throw new ApplicationAlreadyStoppedException(40004,"Spark application has already stopped, please restart it.")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org