You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/26 04:50:03 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2477] Change state early on stopping

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b70a6a0c [KYUUBI #2477] Change state early on stopping
7b70a6a0c is described below

commit 7b70a6a0cee7351cb5f77acc57b0b993e47177cd
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Apr 26 12:49:51 2022 +0800

    [KYUUBI #2477] Change state early on stopping
    
    ### _Why are the changes needed?_
    
    Change state early on stopping to avoid unnecessary invoke.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2477 from pan3793/stop.
    
    Closes #2477
    
    7b6367ea [Cheng Pan] move deregistered into startLifetimeTerminatingChecker
    67a1a7de [Cheng Pan] Change state early on stopping
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 37 +++++++++-------------
 1 file changed, 15 insertions(+), 22 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index d6a7eb797..9c1936157 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -39,7 +39,7 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore}
 import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
 import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies}
+import org.apache.kyuubi.ha.client.RetryPolicies
 import org.apache.kyuubi.service.Serverable
 import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
 
@@ -48,8 +48,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
   override val backendService = new SparkSQLBackendService(spark)
   override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
 
-  @volatile private var shutdown = false
-  @volatile private var deregistered = false
+  private val shutdown = new AtomicBoolean(false)
 
   private val lifetimeTerminatingChecker =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker")
@@ -78,10 +77,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
     })
   }
 
-  override def stop(): Unit = synchronized {
+  override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
     super.stop()
-
-    shutdown = true
     val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
     ThreadUtils.shutdown(
       lifetimeTerminatingChecker,
@@ -95,24 +92,20 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
   private[kyuubi] def startLifetimeTerminatingChecker(stop: () => Unit): Unit = {
     val interval = conf.get(ENGINE_CHECK_INTERVAL)
     val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
+    val deregistered = new AtomicBoolean(false)
     if (maxLifetime > 0) {
-      val checkTask = new Runnable {
-        override def run(): Unit = {
-          if (!shutdown && System.currentTimeMillis() - getStartTime > maxLifetime) {
-            if (!deregistered) {
-              info(s"Spark engine has been running for more than $maxLifetime ms," +
-                s" deregistering from engine discovery space.")
-              frontendServices.flatMap(_.discoveryService).map {
-                case engineServiceDiscovery: EngineServiceDiscovery => engineServiceDiscovery.stop()
-              }
-              deregistered = true
-            }
+      val checkTask: Runnable = () => {
+        if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) {
+          if (deregistered.compareAndSet(false, true)) {
+            info(s"Spark engine has been running for more than $maxLifetime ms," +
+              s" deregistering from engine discovery space.")
+            frontendServices.flatMap(_.discoveryService).foreach(_.stop())
+          }
 
-            if (backendService.sessionManager.getOpenSessionCount <= 0) {
-              info(s"Spark engine has been running for more than $maxLifetime ms" +
-                s" and no open session now, terminating")
-              stop()
-            }
+          if (backendService.sessionManager.getOpenSessionCount <= 0) {
+            info(s"Spark engine has been running for more than $maxLifetime ms" +
+              s" and no open session now, terminating")
+            stop()
           }
         }
       }