You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/04 12:35:18 UTC
[GitHub] [spark] tedyu commented on pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
tedyu commented on PR #38651:
URL: https://github.com/apache/spark/pull/38651#issuecomment-1336401312
I think we can do more than setting default value for `KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD`.
What do you think of the following change ?
Thanks
```
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
index 49ab1d3248..bac171336a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.Pod
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD
import org.apache.spark.internal.Logging
import org.apache.spark.util.Clock
@@ -98,7 +99,15 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(
override def stop(): Unit = {
pollingTasks.asScala.foreach(_.cancel(false))
- val awaitSeconds = conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)
+ var awaitSeconds = conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ if (hadoopConf.get("hadoop.service.shutdown.timeout") != null) {
+ val hadoopTimeout = hadoopConf.get("hadoop.service.shutdown.timeout").toLong
+ val GRACE = 8
+ if (hadoopTimeout-GRACE <= awaitSeconds) {
+ awaitSeconds = hadoopTimeout-GRACE
+ }
+ }
ThreadUtils.shutdown(subscribersExecutor, FiniteDuration(awaitSeconds, TimeUnit.SECONDS))
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org