You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/04 12:35:18 UTC

[GitHub] [spark] tedyu commented on pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

tedyu commented on PR #38651:
URL: https://github.com/apache/spark/pull/38651#issuecomment-1336401312

   I think we can do more than setting default value for `KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD`.
   
   What do you think of the following change ?
   
   Thanks
   ```
   diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
   index 49ab1d3248..bac171336a 100644
   --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
   +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
   @@ -29,6 +29,7 @@ import scala.util.control.NonFatal
    import io.fabric8.kubernetes.api.model.Pod
   
    import org.apache.spark.{SparkConf, SparkContext}
   +import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD
    import org.apache.spark.internal.Logging
    import org.apache.spark.util.Clock
   @@ -98,7 +99,15 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(
   
      override def stop(): Unit = {
        pollingTasks.asScala.foreach(_.cancel(false))
   -    val awaitSeconds = conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)
   +    var awaitSeconds = conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)
   +    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
   +    if (hadoopConf.get("hadoop.service.shutdown.timeout") != null) {
   +        val hadoopTimeout = hadoopConf.get("hadoop.service.shutdown.timeout").toLong
   +        val GRACE = 8
   +        if (hadoopTimeout-GRACE <= awaitSeconds) {
   +            awaitSeconds = hadoopTimeout-GRACE
   +        }
   +    }
        ThreadUtils.shutdown(subscribersExecutor, FiniteDuration(awaitSeconds, TimeUnit.SECONDS))
      }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org