You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/06/23 05:25:25 UTC
[incubator-kyuubi] branch branch-1.5 updated: [KYUUBI #2927][1.5] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new ff05cf893 [KYUUBI #2927][1.5] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately
ff05cf893 is described below
commit ff05cf8934ec95919fecdf8631353c7a15634618
Author: odone <od...@gmail.com>
AuthorDate: Thu Jun 23 13:25:13 2022 +0800
[KYUUBI #2927][1.5] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately
Close #2927 for branch-1.5
### _Why are the changes needed?_
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2934 from iodone/dev-1.5.
Closes #2927
97d5f924 [odone] [KYUUBI #2927][1.5] fixed
Authored-by: odone <od...@gmail.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../engine/flink/operation/ExecuteStatement.scala | 2 +-
.../engine/spark/operation/ExecuteStatement.scala | 2 +-
.../scala/org/apache/kyuubi/util/ThreadUtils.scala | 6 ++++-
.../org/apache/kyuubi/util/ThreadUtilsSuite.scala | 26 ++++++++++++++++++++++
4 files changed, 33 insertions(+), 3 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 96aff0ec1..37dc6f40b 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -179,7 +179,7 @@ class ExecuteStatement(
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 7db23874f..c531e1cc9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -127,7 +127,7 @@ class ExecuteStatement(
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
timeoutExecutor.schedule(
new Runnable {
override def run(): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index a540b954d..48e8d0f59 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -26,11 +26,15 @@ import org.apache.kyuubi.{KyuubiException, Logging}
object ThreadUtils extends Logging {
- def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
+ def newDaemonSingleThreadScheduledExecutor(
+ threadName: String,
+ executeExistingDelayedTasksAfterShutdown: Boolean = true): ScheduledExecutorService = {
val threadFactory = new NamedThreadFactory(threadName, daemon = true)
val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
executor.setRemoveOnCancelPolicy(true)
executor
+ .setExecuteExistingDelayedTasksAfterShutdownPolicy(executeExistingDelayedTasksAfterShutdown)
+ executor
}
def newDaemonQueuedThreadPool(
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
index 858a71570..6bf0247b6 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
@@ -35,4 +35,30 @@ class ThreadUtilsSuite extends KyuubiFunSuite {
service.awaitTermination(10, TimeUnit.SECONDS)
assert(threadName startsWith "ThreadUtilsTest")
}
+
+ test("New daemon single thread scheduled executor for shutdownNow") {
+ val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest")
+ @volatile var threadName = ""
+ service.submit(new Runnable {
+ override def run(): Unit = {
+ threadName = Thread.currentThread().getName
+ }
+ })
+ service.shutdownNow()
+ service.awaitTermination(10, TimeUnit.SECONDS)
+ assert(threadName startsWith "")
+ }
+
+ test("New daemon single thread scheduled executor for cancel delayed tasks") {
+ val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest", false)
+ @volatile var threadName = ""
+ service.submit(new Runnable {
+ override def run(): Unit = {
+ threadName = Thread.currentThread().getName
+ }
+ })
+ service.shutdown()
+ service.awaitTermination(10, TimeUnit.SECONDS)
+ assert(threadName startsWith "")
+ }
}