You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/04/21 02:49:30 UTC

[kyuubi] branch master updated: [KYUUBI #4739] Add operation lock instead of locking state Enumeration

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ccacb33c6 [KYUUBI #4739] Add operation lock instead of locking state Enumeration
ccacb33c6 is described below

commit ccacb33c690f4853dab45854dc8063b23c4251a0
Author: fwang12 <fw...@ebay.com>
AuthorDate: Fri Apr 21 10:49:21 2023 +0800

    [KYUUBI #4739] Add operation lock instead of locking state Enumeration
    
    ### _Why are the changes needed?_
    
    We meet an issue that cause all the operation stuck when closing operation.
    
    Because now all the operations try to lock a Scala Enumeration val.
    
    And if one of them stuck, all the others will be keep stuck.
    
    In this pr, I add a lock for each operation.
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4739 from turboFei/op_lock.
    
    Closes #4739
    
    535400a42 [fwang12] revert
    a93438927 [fwang12] lockInterruptibly
    274abc9db [fwang12] utils
    ceda7314f [fwang12] op lock
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../apache/kyuubi/engine/chat/operation/ChatOperation.scala |  2 +-
 .../kyuubi/engine/flink/operation/FlinkOperation.scala      |  4 ++--
 .../apache/kyuubi/engine/hive/operation/HiveOperation.scala |  2 +-
 .../apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala |  2 +-
 .../kyuubi/engine/spark/operation/ExecutePython.scala       |  7 +------
 .../kyuubi/engine/spark/operation/SparkOperation.scala      |  6 +++---
 .../apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala  |  9 +++------
 .../kyuubi/engine/trino/operation/TrinoOperation.scala      |  4 ++--
 kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala  | 10 ++++++++++
 .../org/apache/kyuubi/operation/AbstractOperation.scala     |  9 +++++++--
 .../org/apache/kyuubi/client/KyuubiSyncThriftClient.scala   | 13 +++++--------
 .../org/apache/kyuubi/operation/BatchJobSubmission.scala    |  4 ++--
 .../scala/org/apache/kyuubi/operation/KyuubiOperation.scala |  8 ++++----
 13 files changed, 42 insertions(+), 38 deletions(-)

diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
index 38527cbf1..bb6e8a8a3 100644
--- a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
+++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
@@ -62,7 +62,7 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session
     // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
     // could be thrown.
     case e: Throwable =>
-      state.synchronized {
+      withLockRequired {
         val errMsg = Utils.stringifyException(e)
         if (state == OperationState.TIMEOUT) {
           val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index d734cea05..eee2fdc98 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -57,7 +57,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
   }
 
   override protected def afterRun(): Unit = {
-    state.synchronized {
+    withLockRequired {
       if (!isTerminalState(state)) {
         setState(OperationState.FINISHED)
       }
@@ -114,7 +114,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
     // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
     // could be thrown.
     case e: Throwable =>
-      state.synchronized {
+      withLockRequired {
         val errMsg = Utils.stringifyException(e)
         if (state == OperationState.TIMEOUT) {
           val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index 81affdff3..c02569784 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -46,7 +46,7 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session
   }
 
   override def afterRun(): Unit = {
-    state.synchronized {
+    withLockRequired {
       if (!isTerminalState(state)) {
         setState(OperationState.FINISHED)
       }
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 6cac42f49..f4d1c27e7 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -66,7 +66,7 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
     // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
     // could be thrown.
     case e: Throwable =>
-      state.synchronized {
+      withLockRequired {
         val errMsg = Utils.stringifyException(e)
         if (state == OperationState.TIMEOUT) {
           val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index d2627fd99..17cc967e6 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -180,12 +180,7 @@ case class SessionPythonWorker(
     new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1)
   private val lock = new ReentrantLock()
 
-  private def withLockRequired[T](block: => T): T = {
-    try {
-      lock.lock()
-      block
-    } finally lock.unlock()
-  }
+  private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
 
   /**
    * Run the python code and return the response. This method maybe invoked internally,
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index cb7510a89..6b2a5d9eb 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -101,7 +101,7 @@ abstract class SparkOperation(session: Session)
     super.getStatus
   }
 
-  override def cleanup(targetState: OperationState): Unit = state.synchronized {
+  override def cleanup(targetState: OperationState): Unit = withLockRequired {
     operationListener.foreach(_.cleanup())
     if (!isTerminalState(state)) {
       setState(targetState)
@@ -174,7 +174,7 @@ abstract class SparkOperation(session: Session)
     // could be thrown.
     case e: Throwable =>
       if (cancel && !spark.sparkContext.isStopped) spark.sparkContext.cancelJobGroup(statementId)
-      state.synchronized {
+      withLockRequired {
         val errMsg = Utils.stringifyException(e)
         if (state == OperationState.TIMEOUT) {
           val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
@@ -201,7 +201,7 @@ abstract class SparkOperation(session: Session)
   }
 
   override protected def afterRun(): Unit = {
-    state.synchronized {
+    withLockRequired {
       if (!isTerminalState(state)) {
         setState(OperationState.FINISHED)
       }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
index 27090fae4..a5437df92 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
@@ -29,6 +29,8 @@ import org.apache.spark.repl.SparkILoop
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.util.MutableURLClassLoader
 
+import org.apache.kyuubi.Utils
+
 private[spark] case class KyuubiSparkILoop private (
     spark: SparkSession,
     output: ByteArrayOutputStream)
@@ -124,10 +126,5 @@ private[spark] object KyuubiSparkILoop {
   }
 
   private val lock = new ReentrantLock()
-  private def withLockRequired[T](block: => T): T = {
-    try {
-      lock.lock()
-      block
-    } finally lock.unlock()
-  }
+  private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
 }
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index 6e40f65f2..bff058605 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -75,7 +75,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
   }
 
   override protected def afterRun(): Unit = {
-    state.synchronized {
+    withLockRequired {
       if (!isTerminalState(state)) {
         setState(OperationState.FINISHED)
       }
@@ -108,7 +108,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
     // could be thrown.
     case e: Throwable =>
       if (cancel && trino.isRunning) trino.cancelLeafStage()
-      state.synchronized {
+      withLockRequired {
         val errMsg = Utils.stringifyException(e)
         if (state == OperationState.TIMEOUT) {
           val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 3a03682ff..06c572130 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths, StandardCopyOption}
 import java.text.SimpleDateFormat
 import java.util.{Date, Properties, TimeZone, UUID}
 import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.locks.Lock
 
 import scala.collection.JavaConverters._
 import scala.sys.process._
@@ -407,4 +408,13 @@ object Utils extends Logging {
       stringWriter.toString
     }
   }
+
+  def withLockRequired[T](lock: Lock)(block: => T): T = {
+    try {
+      lock.lock()
+      block
+    } finally {
+      lock.unlock()
+    }
+  }
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index d50cb8e24..2e52757a2 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -18,13 +18,14 @@
 package org.apache.kyuubi.operation
 
 import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}
 
-import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationState._
@@ -45,7 +46,11 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
 
   private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
 
-  protected def cleanup(targetState: OperationState): Unit = state.synchronized {
+  private val lock: ReentrantLock = new ReentrantLock()
+
+  protected def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
+
+  protected def cleanup(targetState: OperationState): Unit = withLockRequired {
     if (!isTerminalState(state)) {
       setState(targetState)
       Option(getBackgroundHandle).foreach(_.cancel(true))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 8b8561fa9..587fd5756 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -136,14 +136,11 @@ class KyuubiSyncThriftClient private (
   /**
    * Lock every rpc call to send them sequentially
    */
-  private def withLockAcquired[T](block: => T): T = {
-    try {
-      lock.lock()
-      if (!protocol.getTransport.isOpen) {
-        throw KyuubiSQLException.connectionDoesNotExist()
-      }
-      block
-    } finally lock.unlock()
+  private def withLockAcquired[T](block: => T): T = Utils.withLockRequired(lock) {
+    if (!protocol.getTransport.isOpen) {
+      throw KyuubiSQLException.connectionDoesNotExist()
+    }
+    block
   }
 
   private def withLockAcquiredAsyncRequest[T](block: => T): T = withLockAcquired {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index a723ab4b0..e6433cdc9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -165,7 +165,7 @@ class BatchJobSubmission(
   override def getOperationLog: Option[OperationLog] = Option(_operationLog)
 
   // we can not set to other state if it is canceled
-  private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
+  private def setStateIfNotCanceled(newState: OperationState): Unit = withLockRequired {
     if (state != CANCELED) {
       setState(newState)
       applicationId(_applicationInfo).foreach { appId =>
@@ -318,7 +318,7 @@ class BatchJobSubmission(
     }
   }
 
-  override def close(): Unit = state.synchronized {
+  override def close(): Unit = withLockRequired {
     if (!isClosedOrCanceled) {
       try {
         getOperationLog.foreach(_.close())
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 106a11e4b..e0475394e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -59,7 +59,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
 
   protected def onError(action: String = "operating"): PartialFunction[Throwable, Unit] = {
     case e: Throwable =>
-      state.synchronized {
+      withLockRequired {
         if (isTerminalState(state)) {
           warn(s"Ignore exception in terminal state with $statementId", e)
         } else {
@@ -101,14 +101,14 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
   }
 
   override protected def afterRun(): Unit = {
-    state.synchronized {
+    withLockRequired {
       if (!isTerminalState(state)) {
         setState(OperationState.FINISHED)
       }
     }
   }
 
-  override def cancel(): Unit = state.synchronized {
+  override def cancel(): Unit = withLockRequired {
     if (!isClosedOrCanceled) {
       setState(OperationState.CANCELED)
       MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))
@@ -123,7 +123,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
     }
   }
 
-  override def close(): Unit = state.synchronized {
+  override def close(): Unit = withLockRequired {
     if (!isClosedOrCanceled) {
       setState(OperationState.CLOSED)
       MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))