You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/21 12:02:24 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4739] Add operation lock instead of locking state Enumeration
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 2cb42c2e2 [KYUUBI #4739] Add operation lock instead of locking state Enumeration
2cb42c2e2 is described below
commit 2cb42c2e299bc2203f63e028c92ccf8966f97642
Author: fwang12 <fw...@ebay.com>
AuthorDate: Fri Apr 21 10:49:21 2023 +0800
[KYUUBI #4739] Add operation lock instead of locking state Enumeration
We meet an issue that cause all the operation stuck when closing operation.
Because now all the operations try to lock a Scala Enumeration val.
And if one of them stuck, all the others will be keep stuck.
In this pr, I add a lock for each operation.
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4739 from turboFei/op_lock.
Closes #4739
535400a42 [fwang12] revert
a93438927 [fwang12] lockInterruptibly
274abc9db [fwang12] utils
ceda7314f [fwang12] op lock
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../kyuubi/engine/flink/operation/FlinkOperation.scala | 4 ++--
.../apache/kyuubi/engine/hive/operation/HiveOperation.scala | 2 +-
.../apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala | 2 +-
.../kyuubi/engine/spark/operation/ExecutePython.scala | 7 +------
.../kyuubi/engine/spark/operation/SparkOperation.scala | 6 +++---
.../apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala | 9 +++------
.../kyuubi/engine/trino/operation/TrinoOperation.scala | 4 ++--
kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala | 10 ++++++++++
.../org/apache/kyuubi/operation/AbstractOperation.scala | 9 +++++++--
.../org/apache/kyuubi/client/KyuubiSyncThriftClient.scala | 13 +++++--------
.../org/apache/kyuubi/operation/BatchJobSubmission.scala | 4 ++--
.../scala/org/apache/kyuubi/operation/KyuubiOperation.scala | 8 ++++----
12 files changed, 41 insertions(+), 37 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 2859d659e..ba450828a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -52,7 +52,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
}
override protected def afterRun(): Unit = {
- state.synchronized {
+ withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
@@ -109,7 +109,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
- state.synchronized {
+ withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index 81affdff3..c02569784 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -46,7 +46,7 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session
}
override def afterRun(): Unit = {
- state.synchronized {
+ withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 6cac42f49..f4d1c27e7 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -66,7 +66,7 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
- state.synchronized {
+ withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index d2627fd99..17cc967e6 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -180,12 +180,7 @@ case class SessionPythonWorker(
new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1)
private val lock = new ReentrantLock()
- private def withLockRequired[T](block: => T): T = {
- try {
- lock.lock()
- block
- } finally lock.unlock()
- }
+ private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
/**
* Run the python code and return the response. This method maybe invoked internally,
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index cb7510a89..6b2a5d9eb 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -101,7 +101,7 @@ abstract class SparkOperation(session: Session)
super.getStatus
}
- override def cleanup(targetState: OperationState): Unit = state.synchronized {
+ override def cleanup(targetState: OperationState): Unit = withLockRequired {
operationListener.foreach(_.cleanup())
if (!isTerminalState(state)) {
setState(targetState)
@@ -174,7 +174,7 @@ abstract class SparkOperation(session: Session)
// could be thrown.
case e: Throwable =>
if (cancel && !spark.sparkContext.isStopped) spark.sparkContext.cancelJobGroup(statementId)
- state.synchronized {
+ withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
@@ -201,7 +201,7 @@ abstract class SparkOperation(session: Session)
}
override protected def afterRun(): Unit = {
- state.synchronized {
+ withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
index 27090fae4..a5437df92 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
@@ -29,6 +29,8 @@ import org.apache.spark.repl.SparkILoop
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.util.MutableURLClassLoader
+import org.apache.kyuubi.Utils
+
private[spark] case class KyuubiSparkILoop private (
spark: SparkSession,
output: ByteArrayOutputStream)
@@ -124,10 +126,5 @@ private[spark] object KyuubiSparkILoop {
}
private val lock = new ReentrantLock()
- private def withLockRequired[T](block: => T): T = {
- try {
- lock.lock()
- block
- } finally lock.unlock()
- }
+ private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index 6e40f65f2..bff058605 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -75,7 +75,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
}
override protected def afterRun(): Unit = {
- state.synchronized {
+ withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
@@ -108,7 +108,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
// could be thrown.
case e: Throwable =>
if (cancel && trino.isRunning) trino.cancelLeafStage()
- state.synchronized {
+ withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 7ab312fa1..bde8f24b1 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.text.SimpleDateFormat
import java.util.{Date, Properties, TimeZone, UUID}
import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.locks.Lock
import scala.collection.JavaConverters._
import scala.sys.process._
@@ -387,4 +388,13 @@ object Utils extends Logging {
Option(Thread.currentThread().getContextClassLoader).getOrElse(getKyuubiClassLoader)
def isOnK8s: Boolean = Files.exists(Paths.get("/var/run/secrets/kubernetes.io"))
+
+ def withLockRequired[T](lock: Lock)(block: => T): T = {
+ try {
+ lock.lock()
+ block
+ } finally {
+ lock.unlock()
+ }
+ }
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 9cdd6a8f0..75d75dbb0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -18,13 +18,14 @@
package org.apache.kyuubi.operation
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}
-import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState._
@@ -45,7 +46,11 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
- protected def cleanup(targetState: OperationState): Unit = state.synchronized {
+ private val lock: ReentrantLock = new ReentrantLock()
+
+ protected def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
+
+ protected def cleanup(targetState: OperationState): Unit = withLockRequired {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 8b8561fa9..587fd5756 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -136,14 +136,11 @@ class KyuubiSyncThriftClient private (
/**
* Lock every rpc call to send them sequentially
*/
- private def withLockAcquired[T](block: => T): T = {
- try {
- lock.lock()
- if (!protocol.getTransport.isOpen) {
- throw KyuubiSQLException.connectionDoesNotExist()
- }
- block
- } finally lock.unlock()
+ private def withLockAcquired[T](block: => T): T = Utils.withLockRequired(lock) {
+ if (!protocol.getTransport.isOpen) {
+ throw KyuubiSQLException.connectionDoesNotExist()
+ }
+ block
}
private def withLockAcquiredAsyncRequest[T](block: => T): T = withLockAcquired {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index a723ab4b0..e6433cdc9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -165,7 +165,7 @@ class BatchJobSubmission(
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
// we can not set to other state if it is canceled
- private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
+ private def setStateIfNotCanceled(newState: OperationState): Unit = withLockRequired {
if (state != CANCELED) {
setState(newState)
applicationId(_applicationInfo).foreach { appId =>
@@ -318,7 +318,7 @@ class BatchJobSubmission(
}
}
- override def close(): Unit = state.synchronized {
+ override def close(): Unit = withLockRequired {
if (!isClosedOrCanceled) {
try {
getOperationLog.foreach(_.close())
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index a6b960847..44b2becd7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -56,7 +56,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
protected def onError(action: String = "operating"): PartialFunction[Throwable, Unit] = {
case e: Throwable =>
- state.synchronized {
+ withLockRequired {
if (isTerminalState(state)) {
warn(s"Ignore exception in terminal state with $statementId", e)
} else {
@@ -98,14 +98,14 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
override protected def afterRun(): Unit = {
- state.synchronized {
+ withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
}
}
- override def cancel(): Unit = state.synchronized {
+ override def cancel(): Unit = withLockRequired {
if (!isClosedOrCanceled) {
setState(OperationState.CANCELED)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))
@@ -120,7 +120,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
}
- override def close(): Unit = state.synchronized {
+ override def close(): Unit = withLockRequired {
if (!isClosedOrCanceled) {
setState(OperationState.CLOSED)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))