You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/06/23 05:25:25 UTC

[incubator-kyuubi] branch branch-1.5 updated: [KYUUBI #2927][1.5] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new ff05cf893 [KYUUBI #2927][1.5] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately
ff05cf893 is described below

commit ff05cf8934ec95919fecdf8631353c7a15634618
Author: odone <od...@gmail.com>
AuthorDate: Thu Jun 23 13:25:13 2022 +0800

    [KYUUBI #2927][1.5] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately
    
    Close #2927 for branch-1.5
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2934 from iodone/dev-1.5.
    
    Closes #2927
    
    97d5f924 [odone] [KYUUBI #2927][1.5] fixed
    
    Authored-by: odone <od...@gmail.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../engine/flink/operation/ExecuteStatement.scala  |  2 +-
 .../engine/spark/operation/ExecuteStatement.scala  |  2 +-
 .../scala/org/apache/kyuubi/util/ThreadUtils.scala |  6 ++++-
 .../org/apache/kyuubi/util/ThreadUtilsSuite.scala  | 26 ++++++++++++++++++++++
 4 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 96aff0ec1..37dc6f40b 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -179,7 +179,7 @@ class ExecuteStatement(
   private def addTimeoutMonitor(): Unit = {
     if (queryTimeout > 0) {
       val timeoutExecutor =
-        ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
       val action: Runnable = () => cleanup(OperationState.TIMEOUT)
       timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
       statementTimeoutCleaner = Some(timeoutExecutor)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 7db23874f..c531e1cc9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -127,7 +127,7 @@ class ExecuteStatement(
   private def addTimeoutMonitor(): Unit = {
     if (queryTimeout > 0) {
       val timeoutExecutor =
-        ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
       timeoutExecutor.schedule(
         new Runnable {
           override def run(): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index a540b954d..48e8d0f59 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -26,11 +26,15 @@ import org.apache.kyuubi.{KyuubiException, Logging}
 
 object ThreadUtils extends Logging {
 
-  def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
+  def newDaemonSingleThreadScheduledExecutor(
+      threadName: String,
+      executeExistingDelayedTasksAfterShutdown: Boolean = true): ScheduledExecutorService = {
     val threadFactory = new NamedThreadFactory(threadName, daemon = true)
     val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
     executor.setRemoveOnCancelPolicy(true)
     executor
+      .setExecuteExistingDelayedTasksAfterShutdownPolicy(executeExistingDelayedTasksAfterShutdown)
+    executor
   }
 
   def newDaemonQueuedThreadPool(
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
index 858a71570..6bf0247b6 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
@@ -35,4 +35,30 @@ class ThreadUtilsSuite extends KyuubiFunSuite {
     service.awaitTermination(10, TimeUnit.SECONDS)
     assert(threadName startsWith "ThreadUtilsTest")
   }
+
+  test("New daemon single thread scheduled executor for shutdownNow") {
+    val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest")
+    @volatile var threadName = ""
+    service.submit(new Runnable {
+      override def run(): Unit = {
+        threadName = Thread.currentThread().getName
+      }
+    })
+    service.shutdownNow()
+    service.awaitTermination(10, TimeUnit.SECONDS)
+    assert(threadName startsWith "")
+  }
+
+  test("New daemon single thread scheduled executor for cancel delayed tasks") {
+    val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest", false)
+    @volatile var threadName = ""
+    service.submit(new Runnable {
+      override def run(): Unit = {
+        threadName = Thread.currentThread().getName
+      }
+    })
+    service.shutdown()
+    service.awaitTermination(10, TimeUnit.SECONDS)
+    assert(threadName startsWith "")
+  }
 }