You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2018/01/26 07:24:13 UTC

spark git commit: [SPARK-23200] Reset Kubernetes-specific config on Checkpoint restore

Repository: spark
Updated Branches:
  refs/heads/master 70a68b328 -> d1721816d


[SPARK-23200] Reset Kubernetes-specific config on Checkpoint restore

## What changes were proposed in this pull request?

When using the Kubernetes cluster-manager and spawning a Streaming workload, it is important to reset many spark.kubernetes.* properties that are generated by spark-submit but which would get rewritten when restoring a Checkpoint. This is so, because the spark-submit codepath creates Kubernetes resources, such as a ConfigMap, a Secret and other variables, which have an autogenerated name and the previous one will not resolve anymore.

In short, this change enables checkpoint restoration for streaming workloads, and thus enables Spark Streaming workloads in Kubernetes, which were not possible to restore from a checkpoint before if the workload went down.

## How was this patch tested?

This patch was tested with the twitter-streaming example in AWS, using checkpoints in s3 with the s3a:// protocol, as supported by Hadoop.

This is similar to the YARN related code for resetting a Spark Streaming workload, but for the Kubernetes scheduler. I'm adding the initcontainers properties because even if the discussion is not completely settled on the mailing list, my understanding is that at this moment they are going forward for the moment.

For a previous discussion, see the non-rebased work at: https://github.com/apache-spark-on-k8s/spark/pull/516

Author: Santiago Saavedra <ss...@openshine.com>

Closes #20383 from ssaavedra/fix-k8s-checkpointing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1721816
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1721816
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1721816

Branch: refs/heads/master
Commit: d1721816d26bedee3c72eeb75db49da500568376
Parents: 70a68b3
Author: Santiago Saavedra <ss...@openshine.com>
Authored: Fri Jan 26 15:24:06 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Fri Jan 26 15:24:06 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala     | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d1721816/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index aed67a5..ed2a896 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -53,6 +53,21 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
       "spark.driver.host",
       "spark.driver.bindAddress",
       "spark.driver.port",
+      "spark.kubernetes.driver.pod.name",
+      "spark.kubernetes.executor.podNamePrefix",
+      "spark.kubernetes.initcontainer.executor.configmapname",
+      "spark.kubernetes.initcontainer.executor.configmapkey",
+      "spark.kubernetes.initcontainer.downloadJarsResourceIdentifier",
+      "spark.kubernetes.initcontainer.downloadJarsSecretLocation",
+      "spark.kubernetes.initcontainer.downloadFilesResourceIdentifier",
+      "spark.kubernetes.initcontainer.downloadFilesSecretLocation",
+      "spark.kubernetes.initcontainer.remoteJars",
+      "spark.kubernetes.initcontainer.remoteFiles",
+      "spark.kubernetes.mountdependencies.jarsDownloadDir",
+      "spark.kubernetes.mountdependencies.filesDownloadDir",
+      "spark.kubernetes.initcontainer.executor.stagingServerSecret.name",
+      "spark.kubernetes.initcontainer.executor.stagingServerSecret.mountDir",
+      "spark.kubernetes.executor.limit.cores",
       "spark.master",
       "spark.yarn.jars",
       "spark.yarn.keytab",
@@ -66,6 +81,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
     val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
       .remove("spark.driver.host")
       .remove("spark.driver.bindAddress")
+      .remove("spark.kubernetes.driver.pod.name")
       .remove("spark.driver.port")
     val newReloadConf = new SparkConf(loadDefaults = true)
     propertiesToReload.foreach { prop =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org