You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/08 14:48:51 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2250] Support to limit the spark engine max running time

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bc14657b [KYUUBI #2250] Support to limit the spark engine max running time
4bc14657b is described below

commit 4bc14657bf183ad8d74c0cc053fbcd17225e53c8
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Fri Apr 8 22:48:39 2022 +0800

    [KYUUBI #2250] Support to limit the spark engine max running time
    
    ### _Why are the changes needed?_
    
    To close #2250
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2295 from lightning-L/kyuubi-2250.
    
    Closes #2250
    
    71851bf8 [Tianlin Liao] [KYUUBI #2250] limit the spark engine max running time
    7314df84 [Tianlin Liao] [KYUUBI #2250] add method to shutdown threadpool executor in ThreadUtils
    
    Authored-by: Tianlin Liao <ti...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 docs/deployment/settings.md                        |  1 +
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 60 ++++++++++++++++++++--
 .../engine/spark/ShareLevelSparkEngineSuite.scala  | 29 ++++++++++-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 +++
 .../org/apache/kyuubi/session/SessionManager.scala | 21 ++------
 .../scala/org/apache/kyuubi/util/ThreadUtils.scala | 21 +++++++-
 6 files changed, 116 insertions(+), 24 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 2bcc7fa72..7c5d63d97 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -339,6 +339,7 @@ Key | Default | Meaning | Type | Since
 <code>kyuubi.session.engine.request.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The timeout of awaiting response after sending request to remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
 <code>kyuubi.session.engine.share.level</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) - Using kyuubi.engine.share.level instead</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
 <code>kyuubi.session.engine.spark.main.resource</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
+<code>kyuubi.session.engine.spark.max.lifetime</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT0S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Max lifetime for spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
 <code>kyuubi.session.engine.spark.progress.timeFormat</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>yyyy-MM-dd HH:mm:ss.SSS</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The time format of the progress bar</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
 <code>kyuubi.session.engine.spark.progress.update.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Update period of progress bar.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
 <code>kyuubi.session.engine.spark.showProgress</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, show the progress bar in the spark engine log.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.6.0</div>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 6ff551f2b..191825f06 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -18,9 +18,10 @@
 package org.apache.kyuubi.engine.spark
 
 import java.time.Instant
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
 
 import org.apache.spark.{ui, SparkConf}
@@ -38,15 +39,21 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore}
 import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
 import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.RetryPolicies
+import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies}
 import org.apache.kyuubi.service.Serverable
-import org.apache.kyuubi.util.SignalRegister
+import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
 
 case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
 
   override val backendService = new SparkSQLBackendService(spark)
   override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
 
+  @volatile private var shutdown = false
+  @volatile private var deregistered = false
+
+  private val lifetimeTerminatingChecker =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker")
+
   override def initialize(conf: KyuubiConf): Unit = {
     val listener = new SparkSQLEngineListener(this)
     spark.sparkContext.addSparkListener(listener)
@@ -64,11 +71,58 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
       assert(currentEngine.isDefined)
       currentEngine.get.stop()
     })
+
+    startLifetimeTerminatingChecker(() => {
+      assert(currentEngine.isDefined)
+      currentEngine.get.stop()
+    })
+  }
+
+  override def stop(): Unit = synchronized {
+    super.stop()
+
+    shutdown = true
+    val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
+    ThreadUtils.shutdown(
+      lifetimeTerminatingChecker,
+      Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
   }
 
   override protected def stopServer(): Unit = {
     countDownLatch.countDown()
   }
+
+  private[kyuubi] def startLifetimeTerminatingChecker(stop: () => Unit): Unit = {
+    val interval = conf.get(ENGINE_CHECK_INTERVAL)
+    val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
+    if (maxLifetime > 0) {
+      val checkTask = new Runnable {
+        override def run(): Unit = {
+          if (!shutdown && System.currentTimeMillis() - getStartTime > maxLifetime) {
+            if (!deregistered) {
+              info(s"Spark engine has been running for more than $maxLifetime ms," +
+                s" deregistering from engine discovery space.")
+              frontendServices.flatMap(_.discoveryService).map {
+                case engineServiceDiscovery: EngineServiceDiscovery => engineServiceDiscovery.stop()
+              }
+              deregistered = true
+            }
+
+            if (backendService.sessionManager.getOpenSessionCount <= 0) {
+              info(s"Spark engine has been running for more than $maxLifetime ms" +
+                s" and no open session now, terminating")
+              stop()
+            }
+          }
+        }
+      }
+      lifetimeTerminatingChecker.scheduleWithFixedDelay(
+        checkTask,
+        interval,
+        interval,
+        TimeUnit.MILLISECONDS)
+    }
+  }
 }
 
 object SparkSQLEngine extends Logging {
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
index 008f55323..47ed53ce4 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
@@ -19,7 +19,12 @@ package org.apache.kyuubi.engine.spark
 
 import java.util.UUID
 
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel.ShareLevel
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
@@ -33,7 +38,10 @@ abstract class ShareLevelSparkEngineSuite
   extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
   def shareLevel: ShareLevel
   override def withKyuubiConf: Map[String, String] = {
-    super.withKyuubiConf ++ Map(ENGINE_SHARE_LEVEL.key -> shareLevel.toString)
+    super.withKyuubiConf ++ Map(
+      ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
+      ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+      ENGINE_CHECK_INTERVAL.key -> "PT5s")
   }
   override protected def jdbcUrl: String = getJdbcUrl
   override val namespace: String = {
@@ -57,6 +65,25 @@ abstract class ShareLevelSparkEngineSuite
       }
     }
   }
+
+  test("test spark engine max life-time") {
+    withZkClient { zkClient =>
+      assert(engine.getServiceState == ServiceState.STARTED)
+      assert(zkClient.checkExists().forPath(namespace) != null)
+      withJdbcStatement() { _ => }
+
+      eventually(Timeout(30.seconds)) {
+        shareLevel match {
+          case ShareLevel.CONNECTION =>
+            assert(engine.getServiceState == ServiceState.STOPPED)
+            assert(zkClient.checkExists().forPath(namespace) == null)
+          case _ =>
+            assert(engine.getServiceState == ServiceState.STOPPED)
+            assert(zkClient.checkExists().forPath(namespace) != null)
+        }
+      }
+    }
+  }
 }
 
 class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index bb54b37a6..4e8fb9b51 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -622,6 +622,14 @@ object KyuubiConf {
       .stringConf
       .createOptional
 
+  val ENGINE_SPARK_MAX_LIFETIME: ConfigEntry[Long] =
+    buildConf("kyuubi.session.engine.spark.max.lifetime")
+      .doc("Max lifetime for spark engine, the engine will self-terminate when it reaches the" +
+        " end of life. 0 or negative means not to self-terminate.")
+      .version("1.6.0")
+      .timeConf
+      .createWithDefault(0)
+
   val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
     buildConf("kyuubi.session.engine.flink.main.resource")
       .doc("The package used to create Flink SQL engine remote job. If it is undefined," +
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 9aa469418..dfacc9ece 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -22,6 +22,7 @@ import java.nio.file.{Files, Paths}
 import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
@@ -227,25 +228,9 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
       } else {
         conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
       }
-    timeoutChecker.shutdown()
-    try {
-      timeoutChecker.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
-    } catch {
-      case i: InterruptedException =>
-        warn(s"Exceeded to shutdown session timeout checker ", i)
-    }
 
-    if (execPool != null) {
-      execPool.shutdown()
-      try {
-        execPool.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
-      } catch {
-        case e: InterruptedException =>
-          warn(
-            s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
-            e)
-      }
-    }
+    ThreadUtils.shutdown(timeoutChecker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
+    ThreadUtils.shutdown(execPool, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
   }
 
   private def startTimeoutChecker(): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index a540b954d..c5a3944e6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -17,10 +17,10 @@
 
 package org.apache.kyuubi.util
 
-import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
 
 import scala.concurrent.Awaitable
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, FiniteDuration}
 
 import org.apache.kyuubi.{KyuubiException, Logging}
 
@@ -64,4 +64,21 @@ object ThreadUtils extends Logging {
         throw new KyuubiException("Exception thrown in awaitResult: ", e)
     }
   }
+
+  def shutdown(
+      executor: ExecutorService,
+      gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = {
+    val shutdownTimeout = gracePeriod.toMillis
+    if (executor != null) {
+      executor.shutdown()
+      try {
+        executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
+      } catch {
+        case e: InterruptedException =>
+          warn(
+            s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
+            e)
+      }
+    }
+  }
 }