You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/08 14:48:51 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2250] Support to limit the spark engine max running time
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4bc14657b [KYUUBI #2250] Support to limit the spark engine max running time
4bc14657b is described below
commit 4bc14657bf183ad8d74c0cc053fbcd17225e53c8
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Fri Apr 8 22:48:39 2022 +0800
[KYUUBI #2250] Support to limit the spark engine max running time
### _Why are the changes needed?_
To close #2250
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2295 from lightning-L/kyuubi-2250.
Closes #2250
71851bf8 [Tianlin Liao] [KYUUBI #2250] limit the spark engine max running time
7314df84 [Tianlin Liao] [KYUUBI #2250] add method to shutdown threadpool executor in ThreadUtils
Authored-by: Tianlin Liao <ti...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
docs/deployment/settings.md | 1 +
.../kyuubi/engine/spark/SparkSQLEngine.scala | 60 ++++++++++++++++++++--
.../engine/spark/ShareLevelSparkEngineSuite.scala | 29 ++++++++++-
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++
.../org/apache/kyuubi/session/SessionManager.scala | 21 ++------
.../scala/org/apache/kyuubi/util/ThreadUtils.scala | 21 +++++++-
6 files changed, 116 insertions(+), 24 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 2bcc7fa72..7c5d63d97 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -339,6 +339,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.session.engine.request.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The timeout of awaiting response after sending request to remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.session.engine.share.level</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) - Using kyuubi.engine.share.level instead</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.spark.main.resource</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
+<code>kyuubi.session.engine.spark.max.lifetime</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT0S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Max lifetime for spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.session.engine.spark.progress.timeFormat</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>yyyy-MM-dd HH:mm:ss.SSS</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The time format of the progress bar</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.session.engine.spark.progress.update.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Update period of progress bar.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.session.engine.spark.showProgress</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, show the progress bar in the spark engine log.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.6.0</div>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 6ff551f2b..191825f06 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -18,9 +18,10 @@
package org.apache.kyuubi.engine.spark
import java.time.Instant
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
+import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import org.apache.spark.{ui, SparkConf}
@@ -38,15 +39,21 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore}
import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.RetryPolicies
+import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies}
import org.apache.kyuubi.service.Serverable
-import org.apache.kyuubi.util.SignalRegister
+import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
+ @volatile private var shutdown = false
+ @volatile private var deregistered = false
+
+ private val lifetimeTerminatingChecker =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker")
+
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
spark.sparkContext.addSparkListener(listener)
@@ -64,11 +71,58 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
assert(currentEngine.isDefined)
currentEngine.get.stop()
})
+
+ startLifetimeTerminatingChecker(() => {
+ assert(currentEngine.isDefined)
+ currentEngine.get.stop()
+ })
+ }
+
+ override def stop(): Unit = synchronized {
+ super.stop()
+
+ shutdown = true
+ val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
+ ThreadUtils.shutdown(
+ lifetimeTerminatingChecker,
+ Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
}
override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
+
+ private[kyuubi] def startLifetimeTerminatingChecker(stop: () => Unit): Unit = {
+ val interval = conf.get(ENGINE_CHECK_INTERVAL)
+ val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
+ if (maxLifetime > 0) {
+ val checkTask = new Runnable {
+ override def run(): Unit = {
+ if (!shutdown && System.currentTimeMillis() - getStartTime > maxLifetime) {
+ if (!deregistered) {
+ info(s"Spark engine has been running for more than $maxLifetime ms," +
+ s" deregistering from engine discovery space.")
+ frontendServices.flatMap(_.discoveryService).map {
+ case engineServiceDiscovery: EngineServiceDiscovery => engineServiceDiscovery.stop()
+ }
+ deregistered = true
+ }
+
+ if (backendService.sessionManager.getOpenSessionCount <= 0) {
+ info(s"Spark engine has been running for more than $maxLifetime ms" +
+ s" and no open session now, terminating")
+ stop()
+ }
+ }
+ }
+ }
+ lifetimeTerminatingChecker.scheduleWithFixedDelay(
+ checkTask,
+ interval,
+ interval,
+ TimeUnit.MILLISECONDS)
+ }
+ }
}
object SparkSQLEngine extends Logging {
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
index 008f55323..47ed53ce4 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
@@ -19,7 +19,12 @@ package org.apache.kyuubi.engine.spark
import java.util.UUID
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.HiveJDBCTestHelper
@@ -33,7 +38,10 @@ abstract class ShareLevelSparkEngineSuite
extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
def shareLevel: ShareLevel
override def withKyuubiConf: Map[String, String] = {
- super.withKyuubiConf ++ Map(ENGINE_SHARE_LEVEL.key -> shareLevel.toString)
+ super.withKyuubiConf ++ Map(
+ ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
+ ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+ ENGINE_CHECK_INTERVAL.key -> "PT5s")
}
override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
@@ -57,6 +65,25 @@ abstract class ShareLevelSparkEngineSuite
}
}
}
+
+ test("test spark engine max life-time") {
+ withZkClient { zkClient =>
+ assert(engine.getServiceState == ServiceState.STARTED)
+ assert(zkClient.checkExists().forPath(namespace) != null)
+ withJdbcStatement() { _ => }
+
+ eventually(Timeout(30.seconds)) {
+ shareLevel match {
+ case ShareLevel.CONNECTION =>
+ assert(engine.getServiceState == ServiceState.STOPPED)
+ assert(zkClient.checkExists().forPath(namespace) == null)
+ case _ =>
+ assert(engine.getServiceState == ServiceState.STOPPED)
+ assert(zkClient.checkExists().forPath(namespace) != null)
+ }
+ }
+ }
+ }
}
class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index bb54b37a6..4e8fb9b51 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -622,6 +622,14 @@ object KyuubiConf {
.stringConf
.createOptional
+ val ENGINE_SPARK_MAX_LIFETIME: ConfigEntry[Long] =
+ buildConf("kyuubi.session.engine.spark.max.lifetime")
+ .doc("Max lifetime for spark engine, the engine will self-terminate when it reaches the" +
+ " end of life. 0 or negative means not to self-terminate.")
+ .version("1.6.0")
+ .timeConf
+ .createWithDefault(0)
+
val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("kyuubi.session.engine.flink.main.resource")
.doc("The package used to create Flink SQL engine remote job. If it is undefined," +
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 9aa469418..dfacc9ece 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -22,6 +22,7 @@ import java.nio.file.{Files, Paths}
import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@@ -227,25 +228,9 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
} else {
conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
}
- timeoutChecker.shutdown()
- try {
- timeoutChecker.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
- } catch {
- case i: InterruptedException =>
- warn(s"Exceeded to shutdown session timeout checker ", i)
- }
- if (execPool != null) {
- execPool.shutdown()
- try {
- execPool.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
- } catch {
- case e: InterruptedException =>
- warn(
- s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
- e)
- }
- }
+ ThreadUtils.shutdown(timeoutChecker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
+ ThreadUtils.shutdown(execPool, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
}
private def startTimeoutChecker(): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index a540b954d..c5a3944e6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -17,10 +17,10 @@
package org.apache.kyuubi.util
-import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
import scala.concurrent.Awaitable
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, FiniteDuration}
import org.apache.kyuubi.{KyuubiException, Logging}
@@ -64,4 +64,21 @@ object ThreadUtils extends Logging {
throw new KyuubiException("Exception thrown in awaitResult: ", e)
}
}
+
+ def shutdown(
+ executor: ExecutorService,
+ gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = {
+ val shutdownTimeout = gracePeriod.toMillis
+ if (executor != null) {
+ executor.shutdown()
+ try {
+ executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
+ } catch {
+ case e: InterruptedException =>
+ warn(
+ s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
+ e)
+ }
+ }
+ }
}