You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/26 04:50:03 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2477] Change state early on stopping
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7b70a6a0c [KYUUBI #2477] Change state early on stopping
7b70a6a0c is described below
commit 7b70a6a0cee7351cb5f77acc57b0b993e47177cd
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Apr 26 12:49:51 2022 +0800
[KYUUBI #2477] Change state early on stopping
### _Why are the changes needed?_
Change state early on stopping to avoid unnecessary invoke.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2477 from pan3793/stop.
Closes #2477
7b6367ea [Cheng Pan] move deregistered into startLifetimeTerminatingChecker
67a1a7de [Cheng Pan] Change state early on stopping
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 37 +++++++++-------------
1 file changed, 15 insertions(+), 22 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index d6a7eb797..9c1936157 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -39,7 +39,7 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore}
import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies}
+import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
@@ -48,8 +48,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
- @volatile private var shutdown = false
- @volatile private var deregistered = false
+ private val shutdown = new AtomicBoolean(false)
private val lifetimeTerminatingChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker")
@@ -78,10 +77,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
})
}
- override def stop(): Unit = synchronized {
+ override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
super.stop()
-
- shutdown = true
val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
ThreadUtils.shutdown(
lifetimeTerminatingChecker,
@@ -95,24 +92,20 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
private[kyuubi] def startLifetimeTerminatingChecker(stop: () => Unit): Unit = {
val interval = conf.get(ENGINE_CHECK_INTERVAL)
val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
+ val deregistered = new AtomicBoolean(false)
if (maxLifetime > 0) {
- val checkTask = new Runnable {
- override def run(): Unit = {
- if (!shutdown && System.currentTimeMillis() - getStartTime > maxLifetime) {
- if (!deregistered) {
- info(s"Spark engine has been running for more than $maxLifetime ms," +
- s" deregistering from engine discovery space.")
- frontendServices.flatMap(_.discoveryService).map {
- case engineServiceDiscovery: EngineServiceDiscovery => engineServiceDiscovery.stop()
- }
- deregistered = true
- }
+ val checkTask: Runnable = () => {
+ if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) {
+ if (deregistered.compareAndSet(false, true)) {
+ info(s"Spark engine has been running for more than $maxLifetime ms," +
+ s" deregistering from engine discovery space.")
+ frontendServices.flatMap(_.discoveryService).foreach(_.stop())
+ }
- if (backendService.sessionManager.getOpenSessionCount <= 0) {
- info(s"Spark engine has been running for more than $maxLifetime ms" +
- s" and no open session now, terminating")
- stop()
- }
+ if (backendService.sessionManager.getOpenSessionCount <= 0) {
+ info(s"Spark engine has been running for more than $maxLifetime ms" +
+ s" and no open session now, terminating")
+ stop()
}
}
}