You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/11/29 07:02:29 UTC

[2/2] spark git commit: [SPARK-18278][SCHEDULER] Spark on Kubernetes - Basic Scheduler Backend

[SPARK-18278][SCHEDULER] Spark on Kubernetes - Basic Scheduler Backend

## What changes were proposed in this pull request?

This is a stripped down version of the `KubernetesClusterSchedulerBackend` for Spark with the following components:
- Static Allocation of Executors
- Executor Pod Factory
- Executor Recovery Semantics

It's step 1 from the step-wise plan documented [here](https://github.com/apache-spark-on-k8s/spark/issues/441#issuecomment-330802935).
This addition is covered by the [SPIP vote](http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-td22147.html) which passed on Aug 31 .

## How was this patch tested?

- The patch contains unit tests which are passing.
- Manual testing: `./build/mvn -Pkubernetes clean package` succeeded.
- It is a **subset** of the entire changelist hosted in http://github.com/apache-spark-on-k8s/spark which is in active use in several organizations.
- There is integration testing enabled in the fork currently [hosted by PepperData](spark-k8s-jenkins.pepperdata.org:8080) which is being moved over to RiseLAB CI.
- Detailed documentation on trying out the patch in its entirety is in: https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html

cc rxin felixcheung mateiz (shepherd)
k8s-big-data SIG members & contributors: mccheah ash211 ssuchter varunkatta kimoonkim erikerlandson liyinan926 tnachen ifilonenko

Author: Yinan Li <li...@gmail.com>
Author: foxish <ra...@google.com>
Author: mcheah <mc...@palantir.com>

Closes #19468 from foxish/spark-kubernetes-3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9b2070a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9b2070a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9b2070a

Branch: refs/heads/master
Commit: e9b2070ab2d04993b1c0c1d6c6aba249e6664c8d
Parents: 475a29f
Author: Yinan Li <li...@gmail.com>
Authored: Tue Nov 28 23:02:09 2017 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 28 23:02:09 2017 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 NOTICE                                          |   6 +
 .../cluster/SchedulerBackendUtils.scala         |  47 ++
 dev/sparktestsupport/modules.py                 |   8 +
 docs/configuration.md                           |   4 +-
 pom.xml                                         |   7 +
 project/SparkBuild.scala                        |   8 +-
 resource-managers/kubernetes/core/pom.xml       | 100 +++++
 .../org/apache/spark/deploy/k8s/Config.scala    | 123 ++++++
 .../spark/deploy/k8s/ConfigurationUtils.scala   |  41 ++
 .../org/apache/spark/deploy/k8s/Constants.scala |  50 +++
 .../k8s/SparkKubernetesClientFactory.scala      | 102 +++++
 .../cluster/k8s/ExecutorPodFactory.scala        | 219 +++++++++
 .../cluster/k8s/KubernetesClusterManager.scala  |  70 +++
 .../k8s/KubernetesClusterSchedulerBackend.scala | 442 +++++++++++++++++++
 .../core/src/test/resources/log4j.properties    |  31 ++
 .../cluster/k8s/ExecutorPodFactorySuite.scala   | 135 ++++++
 ...KubernetesClusterSchedulerBackendSuite.scala | 440 ++++++++++++++++++
 .../spark/deploy/yarn/YarnAllocator.scala       |   3 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  24 -
 .../cluster/YarnClientSchedulerBackend.scala    |   2 +-
 .../cluster/YarnClusterSchedulerBackend.scala   |   2 +-
 22 files changed, 1832 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index d7e9f8c..05b94ade 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -43,7 +43,7 @@ notifications:
 # 5. Run maven install before running lint-java.
 install:
   - export MAVEN_SKIP_RC=1
-  - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
+  - build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
 
 # 6. Run lint-java.
 script:

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index f4b64b5..6ec240e 100644
--- a/NOTICE
+++ b/NOTICE
@@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc.
 Apache Commons Pool
 Copyright 1999-2009 The Apache Software Foundation
 
+This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client)
+Copyright (C) 2015 Red Hat, Inc.
+
+This product includes/uses OkHttp (https://github.com/square/okhttp)
+Copyright (C) 2012 The Android Open Source Project
+
 =========================================================================
 ==  NOTICE file corresponding to section 4(d) of the Apache License,   ==
 ==  Version 2.0, in this case for the DataNucleus distribution.        ==

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala
new file mode 100644
index 0000000..c166d03
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
+import org.apache.spark.util.Utils
+
+private[spark] object SchedulerBackendUtils {
+  val DEFAULT_NUMBER_EXECUTORS = 2
+
+  /**
+   * Getting the initial target number of executors depends on whether dynamic allocation is
+   * enabled.
+   * If not using dynamic allocation it gets the number of executors requested by the user.
+   */
+  def getInitialTargetExecutorNumber(
+      conf: SparkConf,
+      numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
+    if (Utils.isDynamicAllocationEnabled(conf)) {
+      val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+      val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
+      val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+      require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
+        s"initial executor number $initialNumExecutors must between min executor number " +
+          s"$minNumExecutors and max executor number $maxNumExecutors")
+
+      initialNumExecutors
+    } else {
+      conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index dacc89f..44f990e 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -532,6 +532,14 @@ mesos = Module(
     sbt_test_goals=["mesos/test"]
 )
 
+kubernetes = Module(
+    name="kubernetes",
+    dependencies=[],
+    source_file_regexes=["resource-managers/kubernetes/core"],
+    build_profile_flags=["-Pkubernetes"],
+    sbt_test_goals=["kubernetes/test"]
+)
+
 # The root module is a dummy module which is used to run all of the tests.
 # No other modules should directly depend on this module.
 root = Module(

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 9b9583d..e42f866 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1438,10 +1438,10 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
-  <td>0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
+  <td>0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
   <td>
     The minimum ratio of registered resources (registered resources / total expected resources)
-    (resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained
+    (resources are executors in yarn mode and Kubernetes mode, CPU cores in standalone mode and Mesos coarsed-grained
      mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] )
     to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
     Regardless of whether the minimum ratio of resources has been reached,

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3b2c629..7bc66e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2665,6 +2665,13 @@
     </profile>
 
     <profile>
+      <id>kubernetes</id>
+      <modules>
+        <module>resource-managers/kubernetes/core</module>
+      </modules>
+    </profile>
+
+    <profile>
       <id>hive-thriftserver</id>
       <modules>
         <module>sql/hive-thriftserver</module>

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c726ec2..7570338 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -53,11 +53,11 @@ object BuildCommons {
     "tags", "sketch", "kvstore"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
-  val optionallyEnabledProjects@Seq(mesos, yarn,
+  val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
     streamingFlumeSink, streamingFlume,
     streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
     dockerIntegrationTests, hadoopCloud) =
-    Seq("mesos", "yarn",
+    Seq("kubernetes", "mesos", "yarn",
       "streaming-flume-sink", "streaming-flume",
       "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
       "docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
@@ -671,9 +671,9 @@ object Unidoc {
     publish := {},
 
     unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
     unidocProjectFilter in(JavaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
 
     unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
       ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
new file mode 100644
index 0000000..7d35aea
--- /dev/null
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-kubernetes_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Kubernetes</name>
+  <properties>
+    <sbt.project.name>kubernetes</sbt.project.name>
+    <kubernetes.client.version>3.0.0</kubernetes.client.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-client</artifactId>
+      <version>${kubernetes.client.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.dataformat</groupId>
+          <artifactId>jackson-dataformat-yaml</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Required by kubernetes-client but we exclude it -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <version>${fasterxml.jackson.version}</version>
+    </dependency>
+
+    <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <!-- End of shaded deps. -->
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>okhttp</artifactId>
+      <version>3.8.1</version>
+    </dependency>
+
+  </dependencies>
+
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
new file mode 100644
index 0000000..f0742b9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+    ConfigBuilder("spark.kubernetes.namespace")
+      .doc("The namespace that will be used for running the driver and executor pods. When using " +
+        "spark-submit in cluster mode, this can also be passed to spark-submit via the " +
+        "--kubernetes-namespace command line argument.")
+      .stringConf
+      .createWithDefault("default")
+
+  val EXECUTOR_DOCKER_IMAGE =
+    ConfigBuilder("spark.kubernetes.executor.docker.image")
+      .doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
+        "format.")
+      .stringConf
+      .createOptional
+
+  val DOCKER_IMAGE_PULL_POLICY =
+    ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+      .doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
+      .stringConf
+      .checkValues(Set("Always", "Never", "IfNotPresent"))
+      .createWithDefault("IfNotPresent")
+
+  val APISERVER_AUTH_DRIVER_CONF_PREFIX =
+      "spark.kubernetes.authenticate.driver"
+  val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+      "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+    ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+      .doc("Service account that is used when running the driver pod. The driver pod uses " +
+        "this service account when requesting executor pods from the API server. If specific " +
+        "credentials are given for the driver pod to use, the driver will favor " +
+        "using those credentials instead.")
+      .stringConf
+      .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dynamically determined
+  // based on the executor memory.
+  val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
+    ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
+      .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " +
+        "is memory that accounts for things like VM overheads, interned strings, other native " +
+        "overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
+      .bytesConf(ByteUnit.MiB)
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
+  val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
+
+  val KUBERNETES_DRIVER_POD_NAME =
+    ConfigBuilder("spark.kubernetes.driver.pod.name")
+      .doc("Name of the driver pod.")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
+    ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
+      .doc("Prefix to use in front of the executor pod names.")
+      .internal()
+      .stringConf
+      .createWithDefault("spark")
+
+  val KUBERNETES_ALLOCATION_BATCH_SIZE =
+    ConfigBuilder("spark.kubernetes.allocation.batch.size")
+      .doc("Number of pods to launch at once in each round of executor allocation.")
+      .intConf
+      .checkValue(value => value > 0, "Allocation batch size should be a positive integer")
+      .createWithDefault(5)
+
+  val KUBERNETES_ALLOCATION_BATCH_DELAY =
+    ConfigBuilder("spark.kubernetes.allocation.batch.delay")
+      .doc("Number of seconds to wait between each round of executor allocation.")
+      .longConf
+      .checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
+      .createWithDefault(1)
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+    ConfigBuilder("spark.kubernetes.executor.limit.cores")
+      .doc("Specify the hard cpu limit for a single executor pod")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
+    ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
+      .doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
+        "before it is assumed that the executor failed.")
+      .intConf
+      .checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " +
+        "must be a positive integer")
+      .createWithDefault(10)
+
+  val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
new file mode 100644
index 0000000..0171747
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SparkConf
+
+private[spark] object ConfigurationUtils {
+
+  /**
+   * Extract and parse Spark configuration properties with a given name prefix and
+   * return the result as a Map. Keys must not have more than one value.
+   *
+   * @param sparkConf Spark configuration
+   * @param prefix the given property name prefix
+   * @return a Map storing the configuration property keys and values
+   */
+  def parsePrefixedKeyValuePairs(
+      sparkConf: SparkConf,
+      prefix: String): Map[String, String] = {
+    sparkConf.getAllWithPrefix(prefix).toMap
+  }
+
+  def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
+    opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
new file mode 100644
index 0000000..4ddeefb
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+private[spark] object Constants {
+
+  // Labels
+  val SPARK_APP_ID_LABEL = "spark-app-selector"
+  val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
+  val SPARK_ROLE_LABEL = "spark-role"
+  val SPARK_POD_DRIVER_ROLE = "driver"
+  val SPARK_POD_EXECUTOR_ROLE = "executor"
+
+  // Default and fixed ports
+  val DEFAULT_DRIVER_PORT = 7078
+  val DEFAULT_BLOCKMANAGER_PORT = 7079
+  val BLOCK_MANAGER_PORT_NAME = "blockmanager"
+  val EXECUTOR_PORT_NAME = "executor"
+
+  // Environment Variables
+  val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
+  val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
+  val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
+  val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
+  val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
+  val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
+  val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
+  val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
+  val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
+  val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
+
+  // Miscellaneous
+  val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
+  val MEMORY_OVERHEAD_FACTOR = 0.10
+  val MEMORY_OVERHEAD_MIN_MIB = 384L
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
new file mode 100644
index 0000000..1e3f055
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.utils.HttpClientUtils
+import okhttp3.Dispatcher
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
+ * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
+ * options for different components.
+ */
+private[spark] object SparkKubernetesClientFactory {
+
+  def createKubernetesClient(
+      master: String,
+      namespace: Option[String],
+      kubernetesAuthConfPrefix: String,
+      sparkConf: SparkConf,
+      defaultServiceAccountToken: Option[File],
+      defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
+    val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
+    val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
+    val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
+      .map(new File(_))
+      .orElse(defaultServiceAccountToken)
+    val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
+    ConfigurationUtils.requireNandDefined(
+      oauthTokenFile,
+      oauthTokenValue,
+      s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +
+        s"value $oauthTokenConf.")
+
+    val caCertFile = sparkConf
+      .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
+      .orElse(defaultServiceAccountCaCert.map(_.getAbsolutePath))
+    val clientKeyFile = sparkConf
+      .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
+    val clientCertFile = sparkConf
+      .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
+    val dispatcher = new Dispatcher(
+      ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
+    val config = new ConfigBuilder()
+      .withApiVersion("v1")
+      .withMasterUrl(master)
+      .withWebsocketPingInterval(0)
+      .withOption(oauthTokenValue) {
+        (token, configBuilder) => configBuilder.withOauthToken(token)
+      }.withOption(oauthTokenFile) {
+        (file, configBuilder) =>
+            configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
+      }.withOption(caCertFile) {
+        (file, configBuilder) => configBuilder.withCaCertFile(file)
+      }.withOption(clientKeyFile) {
+        (file, configBuilder) => configBuilder.withClientKeyFile(file)
+      }.withOption(clientCertFile) {
+        (file, configBuilder) => configBuilder.withClientCertFile(file)
+      }.withOption(namespace) {
+        (ns, configBuilder) => configBuilder.withNamespace(ns)
+      }.build()
+    val baseHttpClient = HttpClientUtils.createHttpClient(config)
+    val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
+      .dispatcher(dispatcher)
+      .build()
+    new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
+  }
+
+  private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
+    extends AnyVal {
+
+    def withOption[T]
+        (option: Option[T])
+        (configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
+      option.map { opt =>
+        configurator(opt, configBuilder)
+      }.getOrElse(configBuilder)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
new file mode 100644
index 0000000..f79155b
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+/**
+ * A factory class for configuring and creating executor pods.
+ */
+private[spark] trait ExecutorPodFactory {
+
+  /**
+   * Configure and construct an executor pod with the given parameters.
+   */
+  def createExecutorPod(
+      executorId: String,
+      applicationId: String,
+      driverUrl: String,
+      executorEnvs: Seq[(String, String)],
+      driverPod: Pod,
+      nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  private val executorExtraClasspath =
+    sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+
+  private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
+    sparkConf,
+    KUBERNETES_EXECUTOR_LABEL_PREFIX)
+  require(
+    !executorLabels.contains(SPARK_APP_ID_LABEL),
+    s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
+  require(
+    !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+    s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
+      " Spark.")
+  require(
+    !executorLabels.contains(SPARK_ROLE_LABEL),
+    s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
+
+  private val executorAnnotations =
+    ConfigurationUtils.parsePrefixedKeyValuePairs(
+      sparkConf,
+      KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+  private val nodeSelector =
+    ConfigurationUtils.parsePrefixedKeyValuePairs(
+      sparkConf,
+      KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  private val executorDockerImage = sparkConf
+    .get(EXECUTOR_DOCKER_IMAGE)
+    .getOrElse(throw new SparkException("Must specify the executor Docker image"))
+  private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val blockManagerPort = sparkConf
+    .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+  private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+    org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+    org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+    .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+      MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
+  private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
+
+  override def createExecutorPod(
+      executorId: String,
+      applicationId: String,
+      driverUrl: String,
+      executorEnvs: Seq[(String, String)],
+      driverPod: Pod,
+      nodeToLocalTaskCount: Map[String, Int]): Pod = {
+    val name = s"$executorPodNamePrefix-exec-$executorId"
+
+    // hostname must be no longer than 63 characters, so take the last 63 characters of the pod
+    // name as the hostname.  This preserves uniqueness since the end of name contains
+    // executorId
+    val hostname = name.substring(Math.max(0, name.length - 63))
+    val resolvedExecutorLabels = Map(
+      SPARK_EXECUTOR_ID_LABEL -> executorId,
+      SPARK_APP_ID_LABEL -> applicationId,
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
+      executorLabels
+    val executorMemoryQuantity = new QuantityBuilder(false)
+      .withAmount(s"${executorMemoryMiB}Mi")
+      .build()
+    val executorMemoryLimitQuantity = new QuantityBuilder(false)
+      .withAmount(s"${executorMemoryWithOverhead}Mi")
+      .build()
+    val executorCpuQuantity = new QuantityBuilder(false)
+      .withAmount(executorCores.toString)
+      .build()
+    val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
+      new EnvVarBuilder()
+        .withName(ENV_EXECUTOR_EXTRA_CLASSPATH)
+        .withValue(cp)
+        .build()
+    }
+    val executorExtraJavaOptionsEnv = sparkConf
+      .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
+      .map { opts =>
+        val delimitedOpts = Utils.splitCommandString(opts)
+        delimitedOpts.zipWithIndex.map {
+          case (opt, index) =>
+            new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
+        }
+      }.getOrElse(Seq.empty[EnvVar])
+    val executorEnv = (Seq(
+      (ENV_DRIVER_URL, driverUrl),
+      // Executor backend expects integral value for executor cores, so round it up to an int.
+      (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
+      (ENV_EXECUTOR_MEMORY, executorMemoryString),
+      (ENV_APPLICATION_ID, applicationId),
+      (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
+      .map(env => new EnvVarBuilder()
+        .withName(env._1)
+        .withValue(env._2)
+        .build()
+      ) ++ Seq(
+      new EnvVarBuilder()
+        .withName(ENV_EXECUTOR_POD_IP)
+        .withValueFrom(new EnvVarSourceBuilder()
+          .withNewFieldRef("v1", "status.podIP")
+          .build())
+        .build()
+    ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
+    val requiredPorts = Seq(
+      (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
+      .map { case (name, port) =>
+        new ContainerPortBuilder()
+          .withName(name)
+          .withContainerPort(port)
+          .build()
+      }
+
+    val executorContainer = new ContainerBuilder()
+      .withName("executor")
+      .withImage(executorDockerImage)
+      .withImagePullPolicy(dockerImagePullPolicy)
+      .withNewResources()
+        .addToRequests("memory", executorMemoryQuantity)
+        .addToLimits("memory", executorMemoryLimitQuantity)
+        .addToRequests("cpu", executorCpuQuantity)
+        .endResources()
+      .addAllToEnv(executorEnv.asJava)
+      .withPorts(requiredPorts.asJava)
+      .build()
+
+    val executorPod = new PodBuilder()
+      .withNewMetadata()
+        .withName(name)
+        .withLabels(resolvedExecutorLabels.asJava)
+        .withAnnotations(executorAnnotations.asJava)
+        .withOwnerReferences()
+          .addNewOwnerReference()
+            .withController(true)
+            .withApiVersion(driverPod.getApiVersion)
+            .withKind(driverPod.getKind)
+            .withName(driverPod.getMetadata.getName)
+            .withUid(driverPod.getMetadata.getUid)
+            .endOwnerReference()
+        .endMetadata()
+      .withNewSpec()
+        .withHostname(hostname)
+        .withRestartPolicy("Never")
+        .withNodeSelector(nodeSelector.asJava)
+        .endSpec()
+      .build()
+
+    val containerWithExecutorLimitCores = executorLimitCores.map { limitCores =>
+      val executorCpuLimitQuantity = new QuantityBuilder(false)
+        .withAmount(limitCores)
+        .build()
+      new ContainerBuilder(executorContainer)
+        .editResources()
+        .addToLimits("cpu", executorCpuLimitQuantity)
+        .endResources()
+        .build()
+    }.getOrElse(executorContainer)
+
+    new PodBuilder(executorPod)
+      .editSpec()
+        .addToContainers(containerWithExecutorLimitCores)
+        .endSpec()
+      .build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
new file mode 100644
index 0000000..68ca6a7
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.File
+
+import io.fabric8.kubernetes.client.Config
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
+
+  override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
+
+  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
+    new TaskSchedulerImpl(sc)
+  }
+
+  override def createSchedulerBackend(
+      sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend = {
+    val sparkConf = sc.getConf
+
+    val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
+      KUBERNETES_MASTER_INTERNAL_URL,
+      Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+      APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+      sparkConf,
+      Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
+      Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
+
+    val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf)
+    val allocatorExecutor = ThreadUtils
+      .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
+    val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
+      "kubernetes-executor-requests")
+    new KubernetesClusterSchedulerBackend(
+      scheduler.asInstanceOf[TaskSchedulerImpl],
+      sc.env.rpcEnv,
+      executorPodFactory,
+      kubernetesClient,
+      allocatorExecutor,
+      requestExecutorsService)
+  }
+
+  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
new file mode 100644
index 0000000..e79c987
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    rpcEnv: RpcEnv,
+    executorPodFactory: ExecutorPodFactory,
+    kubernetesClient: KubernetesClient,
+    allocatorExecutor: ScheduledExecutorService,
+    requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+    .get(KUBERNETES_DRIVER_POD_NAME)
+    .getOrElse(throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
+    requestExecutorsService)
+
+  private val driverPod = kubernetesClient.pods()
+    .inNamespace(kubernetesNamespace)
+    .withName(kubernetesDriverPodName)
+    .get()
+
+  protected override val minRegisteredRatio =
+    if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+      0.8
+    } else {
+      super.minRegisteredRatio
+    }
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+    conf.get("spark.driver.host"),
+    conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+    CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+  private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val executorLostReasonCheckMaxAttempts = conf.get(
+    KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
+
+  private val allocatorRunnable = new Runnable {
+
+    // Maintains a map of executor id to count of checks performed to learn the loss reason
+    // for an executor.
+    private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int]
+
+    override def run(): Unit = {
+      handleDisconnectedExecutors()
+
+      val executorsToAllocate = mutable.Map[String, Pod]()
+      val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
+      val currentTotalExpectedExecutors = totalExpectedExecutors.get
+      val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts()
+      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+        if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
+          logDebug("Waiting for pending executors before scaling")
+        } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
+          logDebug("Maximum allowed executor limit reached. Not scaling up further.")
+        } else {
+          for (_ <- 0 until math.min(
+            currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
+            val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
+            val executorPod = executorPodFactory.createExecutorPod(
+              executorId,
+              applicationId(),
+              driverUrl,
+              conf.getExecutorEnv,
+              driverPod,
+              currentNodeToLocalTaskCount)
+            executorsToAllocate(executorId) = executorPod
+            logInfo(
+              s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
+          }
+        }
+      }
+
+      val allocatedExecutors = executorsToAllocate.mapValues { pod =>
+        Utils.tryLog {
+          kubernetesClient.pods().create(pod)
+        }
+      }
+
+      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+        allocatedExecutors.map {
+          case (executorId, attemptedAllocatedExecutor) =>
+            attemptedAllocatedExecutor.map { successfullyAllocatedExecutor =>
+              runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor)
+            }
+        }
+      }
+    }
+
+    def handleDisconnectedExecutors(): Unit = {
+      // For each disconnected executor, synchronize with the loss reasons that may have been found
+      // by the executor pod watcher. If the loss reason was discovered by the watcher,
+      // inform the parent class with removeExecutor.
+      disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach {
+        case (executorId, executorPod) =>
+          val knownExitReason = Option(podsWithKnownExitReasons.remove(
+            executorPod.getMetadata.getName))
+          knownExitReason.fold {
+            removeExecutorOrIncrementLossReasonCheckCount(executorId)
+          } { executorExited =>
+            logWarning(s"Removing executor $executorId with loss reason " + executorExited.message)
+            removeExecutor(executorId, executorExited)
+            // We don't delete the pod running the executor that has an exit condition caused by
+            // the application from the Kubernetes API server. This allows users to debug later on
+            // through commands such as "kubectl logs <pod name>" and
+            // "kubectl describe pod <pod name>". Note that exited containers have terminated and
+            // therefore won't take CPU and memory resources.
+            // Otherwise, the executor pod is marked to be deleted from the API server.
+            if (executorExited.exitCausedByApp) {
+              logInfo(s"Executor $executorId exited because of the application.")
+              deleteExecutorFromDataStructures(executorId)
+            } else {
+              logInfo(s"Executor $executorId failed because of a framework error.")
+              deleteExecutorFromClusterAndDataStructures(executorId)
+            }
+          }
+      }
+    }
+
+    def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
+      val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
+      if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) {
+        removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
+        deleteExecutorFromClusterAndDataStructures(executorId)
+      } else {
+        executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1)
+      }
+    }
+
+    def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
+      deleteExecutorFromDataStructures(executorId).foreach { pod =>
+        kubernetesClient.pods().delete(pod)
+      }
+    }
+
+    def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {
+      disconnectedPodsByExecutorIdPendingRemoval.remove(executorId)
+      executorReasonCheckAttemptCounts -= executorId
+      podsWithKnownExitReasons.remove(executorId)
+      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+        runningExecutorsToPods.remove(executorId).orElse {
+          logWarning(s"Unable to remove pod for unknown executor $executorId")
+          None
+        }
+      }
+    }
+  }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
+  }
+
+  override def start(): Unit = {
+    super.start()
+    executorWatchResource.set(
+      kubernetesClient
+        .pods()
+        .withLabel(SPARK_APP_ID_LABEL, applicationId())
+        .watch(new ExecutorPodsWatcher()))
+
+    allocatorExecutor.scheduleWithFixedDelay(
+      allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)
+
+    if (!Utils.isDynamicAllocationEnabled(conf)) {
+      doRequestTotalExecutors(initialExecutors)
+    }
+  }
+
+  override def stop(): Unit = {
+    // stop allocation of new resources and caches.
+    allocatorExecutor.shutdown()
+    allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS)
+
+    // send stop message to executors so they shut down cleanly
+    super.stop()
+
+    try {
+      val resource = executorWatchResource.getAndSet(null)
+      if (resource != null) {
+        resource.close()
+      }
+    } catch {
+      case e: Throwable => logWarning("Failed to close the executor pod watcher", e)
+    }
+
+    // then delete the executor pods
+    Utils.tryLogNonFatalError {
+      deleteExecutorPodsOnStop()
+      executorPodsByIPs.clear()
+    }
+    Utils.tryLogNonFatalError {
+      logInfo("Closing kubernetes client")
+      kubernetesClient.close()
+    }
+  }
+
+  /**
+   * @return A map of K8s cluster nodes to the number of tasks that could benefit from data
+   *         locality if an executor launches on the cluster node.
+   */
+  private def getNodesWithLocalTaskCounts() : Map[String, Int] = {
+    val nodeToLocalTaskCount = synchronized {
+      mutable.Map[String, Int]() ++ hostToLocalTaskCount
+    }
+
+    for (pod <- executorPodsByIPs.values().asScala) {
+      // Remove cluster nodes that are running our executors already.
+      // TODO: This prefers spreading out executors across nodes. In case users want
+      // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
+      // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
+      nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
+        nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
+        nodeToLocalTaskCount.remove(
+          InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
+    }
+    nodeToLocalTaskCount.toMap[String, Int]
+  }
+
+  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
+    totalExpectedExecutors.set(requestedTotal)
+    true
+  }
+
+  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
+    val podsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+      executorIds.flatMap { executorId =>
+        runningExecutorsToPods.remove(executorId) match {
+          case Some(pod) =>
+            disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+            Some(pod)
+
+          case None =>
+            logWarning(s"Unable to remove pod for unknown executor $executorId")
+            None
+        }
+      }
+    }
+
+    kubernetesClient.pods().delete(podsToDelete: _*)
+    true
+  }
+
+  private def deleteExecutorPodsOnStop(): Unit = {
+    val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+      val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
+      runningExecutorsToPods.clear()
+      runningExecutorPodsCopy
+    }
+    kubernetesClient.pods().delete(executorPodsToDelete: _*)
+  }
+
+  private class ExecutorPodsWatcher extends Watcher[Pod] {
+
+    private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
+
+    override def eventReceived(action: Action, pod: Pod): Unit = {
+      val podName = pod.getMetadata.getName
+      val podIP = pod.getStatus.getPodIP
+
+      action match {
+        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+            && pod.getMetadata.getDeletionTimestamp == null) =>
+          val clusterNodeName = pod.getSpec.getNodeName
+          logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
+          executorPodsByIPs.put(podIP, pod)
+
+        case Action.DELETED | Action.ERROR =>
+          val executorId = getExecutorId(pod)
+          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+          if (podIP != null) {
+            executorPodsByIPs.remove(podIP)
+          }
+
+          val executorExitReason = if (action == Action.ERROR) {
+            logWarning(s"Received error event of executor pod $podName. Reason: " +
+              pod.getStatus.getReason)
+            executorExitReasonOnError(pod)
+          } else if (action == Action.DELETED) {
+            logWarning(s"Received delete event of executor pod $podName. Reason: " +
+              pod.getStatus.getReason)
+            executorExitReasonOnDelete(pod)
+          } else {
+            throw new IllegalStateException(
+              s"Unknown action that should only be DELETED or ERROR: $action")
+          }
+          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
+
+          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
+            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
+              s"watch received an event of type $action for this executor. The executor may " +
+              "have failed to start in the first place and never registered with the driver.")
+          }
+          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+
+        case _ => logDebug(s"Received event of executor pod $podName: " + action)
+      }
+    }
+
+    override def onClose(cause: KubernetesClientException): Unit = {
+      logDebug("Executor pod watch closed.", cause)
+    }
+
+    private def getExecutorExitStatus(pod: Pod): Int = {
+      val containerStatuses = pod.getStatus.getContainerStatuses
+      if (!containerStatuses.isEmpty) {
+        // we assume the first container represents the pod status. This assumption may not hold
+        // true in the future. Revisit this if side-car containers start running inside executor
+        // pods.
+        getExecutorExitStatus(containerStatuses.get(0))
+      } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
+    }
+
+    private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
+      Option(containerStatus.getState).map { containerState =>
+        Option(containerState.getTerminated).map { containerStateTerminated =>
+          containerStateTerminated.getExitCode.intValue()
+        }.getOrElse(UNKNOWN_EXIT_CODE)
+      }.getOrElse(UNKNOWN_EXIT_CODE)
+    }
+
+    private def isPodAlreadyReleased(pod: Pod): Boolean = {
+      val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
+      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+        !runningExecutorsToPods.contains(executorId)
+      }
+    }
+
+    private def executorExitReasonOnError(pod: Pod): ExecutorExited = {
+      val containerExitStatus = getExecutorExitStatus(pod)
+      // container was probably actively killed by the driver.
+      if (isPodAlreadyReleased(pod)) {
+        ExecutorExited(containerExitStatus, exitCausedByApp = false,
+          s"Container in pod ${pod.getMetadata.getName} exited from explicit termination " +
+            "request.")
+      } else {
+        val containerExitReason = s"Pod ${pod.getMetadata.getName}'s executor container " +
+          s"exited with exit status code $containerExitStatus."
+        ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
+      }
+    }
+
+    private def executorExitReasonOnDelete(pod: Pod): ExecutorExited = {
+      val exitMessage = if (isPodAlreadyReleased(pod)) {
+        s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
+      } else {
+        s"Pod ${pod.getMetadata.getName} deleted or lost."
+      }
+      ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
+    }
+
+    private def getExecutorId(pod: Pod): String = {
+      val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
+      require(executorId != null, "Unexpected pod metadata; expected all executor pods " +
+        s"to have label $SPARK_EXECUTOR_ID_LABEL.")
+      executorId
+    }
+  }
+
+  override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+    new KubernetesDriverEndpoint(rpcEnv, properties)
+  }
+
+  private class KubernetesDriverEndpoint(
+      rpcEnv: RpcEnv,
+      sparkProperties: Seq[(String, String)])
+    extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+    override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+      addressToExecutorId.get(rpcAddress).foreach { executorId =>
+        if (disableExecutor(executorId)) {
+          RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+            runningExecutorsToPods.get(executorId).foreach { pod =>
+              disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+            }
+          }
+        }
+      }
+    }
+  }
+}
+
+private object KubernetesClusterSchedulerBackend {
+  private val UNKNOWN_EXIT_CODE = -1
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/resources/log4j.properties b/resource-managers/kubernetes/core/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ad95fad
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark_project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
new file mode 100644
index 0000000..1c7717c
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Pod, _}
+import org.mockito.MockitoAnnotations
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
+  private val driverPodName: String = "driver-pod"
+  private val driverPodUid: String = "driver-uid"
+  private val executorPrefix: String = "base"
+  private val executorImage: String = "executor-image"
+  private val driverPod = new PodBuilder()
+    .withNewMetadata()
+    .withName(driverPodName)
+    .withUid(driverPodUid)
+    .endMetadata()
+    .withNewSpec()
+    .withNodeName("some-node")
+    .endSpec()
+    .withNewStatus()
+    .withHostIP("192.168.99.100")
+    .endStatus()
+    .build()
+  private var baseConf: SparkConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    baseConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
+      .set(EXECUTOR_DOCKER_IMAGE, executorImage)
+  }
+
+  test("basic executor pod has reasonable defaults") {
+    val factory = new ExecutorPodFactoryImpl(baseConf)
+    val executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
+
+    // The executor pod name and default labels.
+    assert(executor.getMetadata.getName === s"$executorPrefix-exec-1")
+    assert(executor.getMetadata.getLabels.size() === 3)
+    assert(executor.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) === "1")
+
+    // There is exactly 1 container with no volume mounts and default memory limits.
+    // Default memory limit is 1024M + 384M (minimum overhead constant).
+    assert(executor.getSpec.getContainers.size() === 1)
+    assert(executor.getSpec.getContainers.get(0).getImage === executorImage)
+    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty)
+    assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1)
+    assert(executor.getSpec.getContainers.get(0).getResources
+      .getLimits.get("memory").getAmount === "1408Mi")
+
+    // The pod has no node selector, volumes.
+    assert(executor.getSpec.getNodeSelector.isEmpty)
+    assert(executor.getSpec.getVolumes.isEmpty)
+
+    checkEnv(executor, Map())
+    checkOwnerReferences(executor, driverPodUid)
+  }
+
+  test("executor pod hostnames get truncated to 63 characters") {
+    val conf = baseConf.clone()
+    conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
+      "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
+
+    val factory = new ExecutorPodFactoryImpl(conf)
+    val executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
+
+    assert(executor.getSpec.getHostname.length === 63)
+  }
+
+  test("classpath and extra java options get translated into environment variables") {
+    val conf = baseConf.clone()
+    conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
+    conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
+
+    val factory = new ExecutorPodFactoryImpl(conf)
+    val executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())
+
+    checkEnv(executor,
+      Map("SPARK_JAVA_OPT_0" -> "foo=bar",
+        "SPARK_EXECUTOR_EXTRA_CLASSPATH" -> "bar=baz",
+        "qux" -> "quux"))
+    checkOwnerReferences(executor, driverPodUid)
+  }
+
+  // There is always exactly one controller reference, and it points to the driver pod.
+  private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
+    assert(executor.getMetadata.getOwnerReferences.size() === 1)
+    assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid)
+    assert(executor.getMetadata.getOwnerReferences.get(0).getController === true)
+  }
+
+  // Check that the expected environment variables are present.
+  private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = {
+    val defaultEnvs = Map(
+      ENV_EXECUTOR_ID -> "1",
+      ENV_DRIVER_URL -> "dummy",
+      ENV_EXECUTOR_CORES -> "1",
+      ENV_EXECUTOR_MEMORY -> "1g",
+      ENV_APPLICATION_ID -> "dummy",
+      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
+
+    assert(executor.getSpec.getContainers.size() === 1)
+    assert(executor.getSpec.getContainers.get(0).getEnv.size() === defaultEnvs.size)
+    val mapEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map {
+      x => (x.getName, x.getValue)
+    }.toMap
+    assert(defaultEnvs === mapEnvs)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org