You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/21 18:58:09 UTC

[GitHub] vanzin closed pull request #22722: [SPARK-24432][k8s] Add support for dynamic resource allocation on Kubernetes

vanzin closed pull request #22722: [SPARK-24432][k8s] Add support for dynamic resource allocation on Kubernetes 
URL: https://github.com/apache/spark/pull/22722
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java
new file mode 100644
index 0000000000000..7135d1af5facd
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.kubernetes;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.shuffle.ExternalShuffleClient;
+import org.apache.spark.network.shuffle.protocol.RegisterDriver;
+import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat;
+import org.apache.spark.network.util.TransportConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A client for talking to the external shuffle service in Kubernetes cluster mode.
+ *
+ * This is used by the each Spark executor to register with a corresponding external
+ * shuffle service on the cluster. The purpose is for cleaning up shuffle files
+ * reliably if the application exits unexpectedly.
+ */
+public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
+  private static final Logger logger = LoggerFactory
+    .getLogger(KubernetesExternalShuffleClient.class);
+
+  private final ScheduledExecutorService heartbeaterThread =
+    Executors.newSingleThreadScheduledExecutor(
+      new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("kubernetes-external-shuffle-client-heartbeater")
+        .build());
+
+  /**
+   * Creates a Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}.
+   * Please refer to docs on {@link ExternalShuffleClient} for more information.
+   */
+  public KubernetesExternalShuffleClient(
+      TransportConf conf,
+      SecretKeyHolder secretKeyHolder,
+      boolean saslEnabled,
+      long registrationTimeoutMs) {
+    super(conf, secretKeyHolder, saslEnabled, registrationTimeoutMs);
+  }
+
+  public void registerDriverWithShuffleService(
+      String host,
+      int port,
+      long heartbeatTimeoutMs,
+      long heartbeatIntervalMs) throws IOException, InterruptedException {
+    checkInit();
+    ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer();
+    TransportClient client = clientFactory.createClient(host, port);
+    client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs));
+  }
+
+  @Override
+  public void close() {
+    heartbeaterThread.shutdownNow();
+    super.close();
+  }
+
+  private class RegisterDriverCallback implements RpcResponseCallback {
+    private final TransportClient client;
+    private final long heartbeatIntervalMs;
+
+    private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) {
+      this.client = client;
+      this.heartbeatIntervalMs = heartbeatIntervalMs;
+    }
+
+    @Override
+    public void onSuccess(ByteBuffer response) {
+      heartbeaterThread.scheduleAtFixedRate(
+        new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
+      logger.info("Successfully registered app " + appId + " with external shuffle service.");
+    }
+
+    @Override
+    public void onFailure(Throwable e) {
+      logger.warn("Unable to register app " + appId + " with external shuffle service. " +
+        "Please manually remove shuffle data after driver exit. Error: " + e);
+    }
+  }
+
+  private class Heartbeater implements Runnable {
+
+    private final TransportClient client;
+
+    private Heartbeater(TransportClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public void run() {
+      // TODO: Stop sending heartbeats if the shuffle service has lost the app due to timeout
+      client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer());
+    }
+  }
+}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
index 60179f126bc44..3510509f20eee 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
+import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +32,7 @@
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.shuffle.ExternalShuffleClient;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+import org.apache.spark.network.shuffle.protocol.RegisterDriver;
 import org.apache.spark.network.util.TransportConf;
 
 /**
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index a68a297519b66..f5e51c5cd4929 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -23,8 +23,6 @@
 import io.netty.buffer.Unpooled;
 
 import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
-import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
 
 /**
  * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java
similarity index 91%
rename from common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java
index d5f53ccb7f741..8e9e38ff0c720 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.spark.network.shuffle.protocol.mesos;
+package org.apache.spark.network.shuffle.protocol;
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 
 // Needed by ScalaDoc. See SPARK-7726
 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
 
 /**
- * A message sent from the driver to register with the MesosExternalShuffleService.
+ * A message sent from the driver to register with the ExternalShuffleService.
  */
 public class RegisterDriver extends BlockTransferMessage {
   private final String appId;
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java
similarity index 92%
rename from common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java
rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java
index b30bb9aed55b6..3f6cb0d0d4798 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.spark.network.shuffle.protocol.mesos;
+package org.apache.spark.network.shuffle.protocol;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 
 // Needed by ScalaDoc. See SPARK-7726
 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
 
 /**
- * A heartbeat sent from the driver to the MesosExternalShuffleService.
+ * A heartbeat sent from the driver to the MesosExternalShuffleService and
+ * KubernetesExternalShuffleService.
  */
 public class ShuffleServiceHeartbeat extends BlockTransferMessage {
   private final String appId;
diff --git a/conf/k8s-shuffle-service.yaml.template b/conf/k8s-shuffle-service.yaml.template
new file mode 100644
index 0000000000000..5fb674b2480c3
--- /dev/null
+++ b/conf/k8s-shuffle-service.yaml.template
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: extensions/v1beta1
+kind: DaemonSet
+metadata:
+  labels:
+  #  key1: value1
+  #  key2: value2
+  namespace: spark   # change namespace according to your cluster configuration.
+  name: spark-shuffle-service
+spec:
+  template:
+    metadata:
+      labels:
+        #  key1: value1
+        #  key2: value2
+    spec:
+      # nodeSelector:
+      #  key: "value"     # change this launch the shuffle service on some specific nodes
+      volumes:
+        - name: shuffle-volume
+          hostPath:
+            path: '/tmp/spark-local'  # change this path according to your cluster configuration.
+      containers:
+        - name: shuffle-service
+          args: ["shuffle-service"]
+          image: kubespark/spark:spark-241   # change the image according to your cluster configuration.
+          imagePullPolicy: IfNotPresent
+          volumeMounts:
+            - mountPath: '/tmp/spark-local'   # change this path according to your cluster configuration.
+              name: shuffle-volume
+          resources:
+             requests:
+               cpu: "1"
+             limits:
+               cpu: "1"
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index f6b3c37f0fe72..64eeb54cf2f93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -100,6 +100,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
   }
 
   def stop() {
+    blockHandler.close()
     if (server != null) {
       server.close()
       server = null
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0fe82ac0cedc5..5ee6e8dfd4fd6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -256,8 +256,14 @@ private[spark] class BlockManager(
     blockManagerId = if (idFromMaster != null) idFromMaster else id
 
     shuffleServerId = if (externalShuffleServiceEnabled) {
-      logInfo(s"external shuffle service port = $externalShuffleServicePort")
-      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
+      val shuffleServerHostName = if (blockManagerId.isDriver) {
+        blockTransferService.hostName
+      } else {
+        conf.get("spark.shuffle.service.host", blockTransferService.hostName)
+      }
+      logInfo(s"external shuffle service host = $shuffleServerHostName, " +
+        s"port = $externalShuffleServicePort")
+      BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort)
     } else {
       blockManagerId
     }
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index b4088d79addff..9cc077621c56c 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -156,6 +156,40 @@ exits.
 
 Use the exact prefix `spark.kubernetes.authenticate` for Kubernetes authentication parameters in client mode.
 
+## Dynamic Executor Scaling
+
+To enable dynamic resource allocation, you need to first launch our external shuffle service on the nodes that spark executors will
+be created on. This is typically a [DaemonSet](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) with a provisioned
+[hostpath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volume. This shuffle service may be shared by executors
+belonging to different SparkJobs. Using Spark with dynamic allocation on Kubernetes assumes that a cluster administrator has set up one
+or more shuffle-service DaemonSets in the cluster.
+
+A sample configuration file is provided in `conf\k8s-shuffle-service.yaml.template` which can be customized as needed for a particular cluster.
+It is important to note that `spec.template.metadata.labels` are setup appropriately for the shuffle service because there may be multiple
+shuffle service instances running in a cluster. The labels give Spark applications a way to target a particular shuffle service.
+
+Please note that when enabling dynamic resource allocation, the executor's shuffle files should be visible to the shuffle service that
+located on the same node, which means the volume mount configuration of shuffle service should be identical to spark executors'.
+
+For example, if the shuffle service we want to use is in the 'spark' namespace, and has pods with labels app=spark-shuffle-service, we can use
+those tags to target that particular shuffle service at job launch time. In order to run a job with dynamic allocation enabled,
+the command may then look like the following:
+
+```bash
+$ bin/spark-submit \
+    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
+    --deploy-mode cluster \
+    --name spark-groupby \
+    --class org.apache.spark.examples.GroupByTest \
+    --conf spark.local.dir=/tmp/spark-local \
+    --conf spark.dynamicAllocation.enabled=true \
+    --conf spark.shuffle.service.enabled=true \
+    --conf spark.kubernetes.shuffle.namespace=spark \
+    --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service" \
+    --conf spark.kubernetes.container.image=<spark-image> \
+    local:///path/to/examples.jar 10 400000 2
+```
+
 ## Dependency Management
 
 If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to
@@ -391,6 +425,23 @@ specific to Spark on Kubernetes.
     Custom container image to use for executors.
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.shuffle.namespace</code></td>
+  <td><code>default</code></td>
+  <td>
+    Namespace in which the shuffle service pods are present.
+    The shuffle service must be created in the cluster prior to attempts to use it.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.shuffle.labels</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Labels that will be used to look up shuffle service pods.
+    This should be a comma-separated list of label key-value pairs, where each label is in the format key=value.
+    The labels chosen must be such that they match exactly one shuffle service pod on each node that executors are launched.
+  </td>
+</tr>
 <tr>
   <td><code>spark.kubernetes.container.image.pullPolicy</code></td>
   <td><code>IfNotPresent</code></td>
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index fd056bb90e0c4..e58fd27737c5c 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -85,6 +85,7 @@
         break;
       case "org.apache.spark.deploy.ExternalShuffleService":
       case "org.apache.spark.deploy.mesos.MesosExternalShuffleService":
+      case "org.apache.spark.deploy.k8s.KubernetesExternalShuffleService":
         javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
         javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
         extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 71e4d321a0e3a..da0b864f0f8de 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -29,6 +29,30 @@ private[spark] object Config extends Logging {
       .stringConf
       .createWithDefault("default")
 
+  val KUBERNETES_SHUFFLE_NAMESPACE =
+    ConfigBuilder("spark.kubernetes.shuffle.namespace")
+      .doc("Namespace of the shuffle service")
+      .stringConf
+      .createWithDefault("default")
+
+  val KUBERNETES_SHUFFLE_LABELS =
+    ConfigBuilder("spark.kubernetes.shuffle.labels")
+      .doc("Labels to identify the shuffle service")
+      .stringConf
+      .createOptional
+
+  val SPARK_SHUFFLE_SERVICE_HOST =
+    ConfigBuilder("spark.shuffle.service.host")
+      .doc("Host for Spark Shuffle Service")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val SHUFFLE_CLEANER_INTERVAL_S =
+    ConfigBuilder("spark.shuffle.cleaner.interval")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("30s")
+
   val CONTAINER_IMAGE =
     ConfigBuilder("spark.kubernetes.container.image")
       .doc("Container image to use for Spark containers. Individual container types " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala
new file mode 100644
index 0000000000000..08cb52dbfd48a
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver, ShuffleServiceHeartbeat}
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.util.ThreadUtils
+
+// Todo: handle duplicated code with Mesos
+
+/**
+ * An RPC endpoint that receives registration requests from Spark drivers running on Kubernetes.
+ * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
+ */
+private[spark] class KubernetesShuffleBlockHandler(
+    transportConf: TransportConf,
+    cleanerIntervalS: Long)
+  extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
+
+  ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
+    .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS)
+
+  // Stores a map of app id to app state (timeout value and last heartbeat)
+  private val connectedApps = new ConcurrentHashMap[String, AppState]()
+
+  protected override def handleMessage(
+      message: BlockTransferMessage,
+      client: TransportClient,
+      callback: RpcResponseCallback): Unit = {
+    message match {
+      case RegisterDriverParam(appId, appState) =>
+        val address = client.getSocketAddress
+        val timeout = appState.heartbeatTimeout
+        logInfo(s"Received registration request from app $appId (remote address $address, " +
+          s"heartbeat timeout $timeout ms).")
+        if (connectedApps.contains(appId)) {
+          logWarning(s"Received a registration request from app $appId, but it was already " +
+            s"registered")
+        }
+        connectedApps.put(appId, appState)
+        callback.onSuccess(ByteBuffer.allocate(0))
+      case Heartbeat(appId) =>
+        val address = client.getSocketAddress
+        Option(connectedApps.get(appId)) match {
+          case Some(existingAppState) =>
+            logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " +
+              s"address $address).")
+            existingAppState.lastHeartbeat = System.nanoTime()
+          case None =>
+            logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " +
+              s"address $address, appId '$appId').")
+        }
+      case _ => super.handleMessage(message, client, callback)
+    }
+  }
+
+  private object Heartbeat {
+    def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
+  }
+
+  private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long)
+
+  /** An extractor object for matching [[RegisterDriver]] message. */
+  private object RegisterDriverParam {
+    def unapply(r: RegisterDriver): Option[(String, AppState)] =
+      Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime())))
+  }
+
+  private class CleanerThread extends Runnable {
+    override def run(): Unit = {
+      val now = System.nanoTime()
+      connectedApps.asScala.foreach { case (appId, appState) =>
+        if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) {
+          logInfo(s"Application $appId timed out. Removing shuffle files.")
+          connectedApps.remove(appId)
+          applicationRemoved(appId, true)
+        }
+      }
+    }
+  }
+}
+
+private[spark] class KubernetesExternalShuffleService(conf: SparkConf,
+    securityManager: SecurityManager)
+  extends ExternalShuffleService(conf, securityManager) {
+
+  protected override def newShuffleBlockHandler(
+      tConf: TransportConf): ExternalShuffleBlockHandler = {
+    val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S)
+    new KubernetesShuffleBlockHandler(tConf, cleanerIntervalS)
+  }
+}
+
+private[spark] object KubernetesExternalShuffleService extends Logging {
+  def main(args: Array[String]): Unit = {
+    ExternalShuffleService.main(args,
+      (conf: SparkConf, sm: SecurityManager) => new KubernetesExternalShuffleService(conf, sm))
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index f5fae7cc8c470..dc61ee2d7e3cc 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -24,6 +24,38 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.util.Utils
 
 private[spark] object KubernetesUtils {
+  def parseKeyValuePairs(
+      maybeKeyValues: Option[String],
+      configKey: String,
+      keyValueType: String): Map[String, String] = {
+
+    maybeKeyValues.map(keyValues => {
+      keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
+        keyValue.split("=", 2).toSeq match {
+          case Seq(k, v) =>
+            (k, v)
+          case _ =>
+            throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
+              s" comma-separated list of key-value pairs, with format <key>=<value>." +
+              s" Got value: $keyValue. All values: $keyValues")
+        }
+      }).toMap
+    }).getOrElse(Map.empty[String, String])
+  }
+
+  def parsePrefixedKeyValuePairs(
+      sparkConf: SparkConf,
+      prefix: String,
+      configType: String): Map[String, String] = {
+    val fromPrefix = sparkConf.getAllWithPrefix(prefix)
+    fromPrefix.groupBy(_._1).foreach {
+      case (key, values) =>
+        require(values.size == 1,
+          s"Cannot have multiple values for a given $configType key, got key $key with" +
+            s" values $values")
+    }
+    fromPrefix.toMap
+  }
 
   /**
    * Extract and parse Spark configuration properties with a given name prefix and
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
index ff5ad6673b309..fc75f36721c49 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
@@ -174,17 +174,17 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube
    *         which may be empty if the user-specified credential is empty.
    */
   private def resolveSecretData(
-    userSpecifiedCredential: Option[String],
-    secretName: String): Map[String, String] = {
+      userSpecifiedCredential: Option[String],
+      secretName: String): Map[String, String] = {
     userSpecifiedCredential.map { valueBase64 =>
       Map(secretName -> valueBase64)
     }.getOrElse(Map.empty[String, String])
   }
 
   private def resolveSecretLocation(
-    mountedUserSpecified: Option[String],
-    valueMountedFromSubmitter: Option[String],
-    mountedCanonicalLocation: String): Option[String] = {
+      mountedUserSpecified: Option[String],
+      valueMountedFromSubmitter: Option[String],
+      mountedCanonicalLocation: String): Option[String] = {
     mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
       mountedCanonicalLocation
     })
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala
new file mode 100644
index 0000000000000..3267c7a547806
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}
+import org.apache.commons.io.FilenameUtils
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf}
+import org.apache.spark.util.Utils
+
+class ExternalShuffleLocalDirsFeatureStep(
+    conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) extends LocalDirsFeatureStep(conf) {
+
+  private val resolvedLocalDirs = Utils.getConfiguredLocalDirs(conf.sparkConf)
+
+  override def getDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = {
+    // TODO: Using hostPath for the local directory will also make it such that the
+    // other uses of the local directory - broadcasting and caching - will also write
+    // to the directory that the shuffle service is aware of. It would be better for
+    // these directories to be separate so that the lifetime of the non-shuffle scratch
+    // space is tied to an emptyDir instead of the hostPath. This requires a change in
+    // core Spark as well.
+    resolvedLocalDirs.zipWithIndex.map {
+      case (shuffleDir, shuffleDirIndex) =>
+        val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}"
+        val volume = new VolumeBuilder()
+          .withName(volumeName)
+          .withNewHostPath(shuffleDir)
+          .build()
+        val volumeMount = new VolumeMountBuilder()
+          .withName(volumeName)
+          .withMountPath(shuffleDir)
+          .build()
+        (volume, volumeMount)
+    }
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
index be386e119d465..9d1bbb2ac970a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
@@ -16,12 +16,11 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import java.nio.file.Paths
 import java.util.UUID
 
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
+import io.fabric8.kubernetes.api.model._
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 
 private[spark] class LocalDirsFeatureStep(
@@ -40,10 +39,10 @@ private[spark] class LocalDirsFeatureStep(
     .split(",")
   private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
 
-  override def configurePod(pod: SparkPod): SparkPod = {
+  def getDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = {
     val localDirVolumes = resolvedLocalDirs
       .zipWithIndex
-      .map { case (localDir, index) =>
+      .map { case (_, index) =>
         new VolumeBuilder()
           .withName(s"spark-local-dir-${index + 1}")
           .withNewEmptyDir()
@@ -59,9 +58,14 @@ private[spark] class LocalDirsFeatureStep(
           .withMountPath(localDirPath)
           .build()
       }
+    localDirVolumes.zip(localDirVolumeMounts)
+  }
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val volumesWithMounts = getDirVolumesWithMounts()
     val podWithLocalDirVolumes = new PodBuilder(pod.pod)
       .editSpec()
-        .addToVolumes(localDirVolumes: _*)
+        .addToVolumes(volumesWithMounts.map(_._1): _*)
         .endSpec()
       .build()
     val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
@@ -69,7 +73,7 @@ private[spark] class LocalDirsFeatureStep(
         .withName("SPARK_LOCAL_DIRS")
         .withValue(resolvedLocalDirs.mkString(","))
         .endEnv()
-      .addToVolumeMounts(localDirVolumeMounts: _*)
+      .addToVolumeMounts(volumesWithMounts.map(_._2): _*)
       .build()
     SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
   }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 9999c62c878df..7e6524cd15534 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -23,10 +23,13 @@ import com.google.common.cache.CacheBuilder
 import io.fabric8.kubernetes.client.Config
 
 import org.apache.spark.SparkContext
-import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.Logging
+import org.apache.spark.deploy.k8s.features.{ExternalShuffleLocalDirsFeatureStep, LocalDirsFeatureStep}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.util.{SystemClock, ThreadUtils}
 
@@ -79,15 +82,38 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
     val removedExecutorsCache = CacheBuilder.newBuilder()
       .expireAfterWrite(3, TimeUnit.MINUTES)
       .build[java.lang.Long, java.lang.Long]()
+
+    val kubernetesShuffleManager = if (sc.conf.get(
+        org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) {
+      val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClient(
+        SparkTransportConf.fromSparkConf(sc.conf, "shuffle"),
+        sc.env.securityManager,
+        sc.env.securityManager.isAuthenticationEnabled(),
+        sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
+      Some(new KubernetesExternalShuffleManagerImpl(
+        sc.conf,
+        kubernetesClient,
+        kubernetesExternalShuffleClient))
+    } else None
+
+    val localDirsFeatureStepFactory: KubernetesConf[_ <: KubernetesRoleSpecificConf]
+        => LocalDirsFeatureStep = if (kubernetesShuffleManager.isDefined) {
+      new ExternalShuffleLocalDirsFeatureStep(_)
+    } else {
+      new LocalDirsFeatureStep(_)
+    }
+
+    val kubernetesExecutorBuilder = new KubernetesExecutorBuilder(
+      provideLocalDirsStep = localDirsFeatureStepFactory)
     val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
       sc.conf,
-      new KubernetesExecutorBuilder(),
+      kubernetesExecutorBuilder,
       kubernetesClient,
       snapshotsStore,
       removedExecutorsCache)
 
     val executorPodsAllocator = new ExecutorPodsAllocator(
-      sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock())
+      sc.conf, kubernetesExecutorBuilder, kubernetesClient, snapshotsStore, new SystemClock())
 
     val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
       snapshotsStore,
@@ -107,7 +133,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       executorPodsAllocator,
       executorPodsLifecycleEventHandler,
       podsWatchEventSource,
-      podsPollingEventSource)
+      podsPollingEventSource,
+      kubernetesShuffleManager)
   }
 
   override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index fa6dc2c479bbf..f134bee60705b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -19,12 +19,15 @@ package org.apache.spark.scheduler.cluster.k8s
 import java.util.concurrent.ExecutorService
 
 import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
 
+import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 private[spark] class KubernetesClusterSchedulerBackend(
@@ -36,7 +39,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
     podAllocator: ExecutorPodsAllocator,
     lifecycleEventHandler: ExecutorPodsLifecycleManager,
     watchEvents: ExecutorPodsWatchSnapshotSource,
-    pollEvents: ExecutorPodsPollingSnapshotSource)
+    pollEvents: ExecutorPodsPollingSnapshotSource,
+    kubernetesShuffleManager: Option[KubernetesExternalShuffleManager])
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
   private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
@@ -61,6 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     if (!Utils.isDynamicAllocationEnabled(conf)) {
       podAllocator.setTotalExpectedExecutors(initialExecutors)
     }
+    kubernetesShuffleManager.foreach(_.start(applicationId()))
     lifecycleEventHandler.start(this)
     podAllocator.start(applicationId())
     watchEvents.start(applicationId())
@@ -93,6 +98,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
       ThreadUtils.shutdown(requestExecutorsService)
     }
 
+    Utils.tryLogNonFatalError {
+      kubernetesShuffleManager.foreach(_.stop())
+    }
+
     Utils.tryLogNonFatalError {
       kubernetesClient.close()
     }
@@ -136,6 +145,46 @@ private[spark] class KubernetesClusterSchedulerBackend(
       // to be deleted eventually.
       addressToExecutorId.get(rpcAddress).foreach(disableExecutor)
     }
+
+    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+      new PartialFunction[Any, Unit]() {
+        override def isDefinedAt(msg: Any): Boolean = {
+          msg match {
+            case RetrieveSparkAppConfig =>
+              kubernetesShuffleManager.isDefined
+            case _ => false
+          }
+        }
+
+        override def apply(msg: Any): Unit = {
+          msg match {
+            case RetrieveSparkAppConfig if kubernetesShuffleManager.isDefined =>
+              val senderAddress = context.senderAddress
+              val allExecutorPods = kubernetesClient
+                .pods()
+                .withLabel(SPARK_APP_ID_LABEL, applicationId())
+                .list()
+                .getItems
+                .asScala
+              val executor = allExecutorPods.find(pod =>
+                pod.getStatus.getPodIP.equals(senderAddress.host) ||
+                  pod.getSpec.getHostname.equals(senderAddress.host))
+              if (executor.isDefined) {
+                val shuffleSpecificProperties = kubernetesShuffleManager.get
+                  .getShuffleServiceConfigurationForExecutor(executor.get)
+                val reply = SparkAppConfig(
+                  sparkProperties ++ shuffleSpecificProperties,
+                  SparkEnv.get.securityManager.getIOEncryptionKey(),
+                  fetchHadoopDelegationTokens())
+                context.reply(reply)
+              } else {
+                logError(s"Got RetrieveSparkAppConfig message from unknown executor" +
+                  s" address $senderAddress")
+              }
+          }
+        }
+      }.orElse(super.receiveAndReply(context))
+    }
   }
 
 }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 364b6fb367722..d48a66d373400 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -21,7 +21,7 @@ import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
 
 private[spark] class KubernetesExecutorBuilder(
-    provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
+    provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf])
       => BasicExecutorFeatureStep =
       new BasicExecutorFeatureStep(_),
     provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala
new file mode 100644
index 0000000000000..13745267a7832
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.internal.readiness.Readiness
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.KubernetesUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
+
+private[spark] trait KubernetesExternalShuffleManager {
+  def start(appId: String): Unit
+
+  def stop(): Unit
+
+  /**
+   * Returns the properties that should be applied for this executor pod, given that
+   * this executor will need to communicate with an external shuffle service.
+   *
+   * In practice, this seq will always have a size of 1, but since this method's semantics are that
+   * the returned values are key-value pairs to apply as properties, it is clearer to express
+   * this as a collection.
+   */
+  def getShuffleServiceConfigurationForExecutor(executorPod: Pod): Seq[(String, String)]
+}
+
+private[spark] class KubernetesExternalShuffleManagerImpl(
+    sparkConf: SparkConf,
+    client: KubernetesClient,
+    shuffleClient: KubernetesExternalShuffleClient)
+  extends KubernetesExternalShuffleManager with Logging {
+
+  private val shuffleNamespace = sparkConf.get(KUBERNETES_SHUFFLE_NAMESPACE)
+  private val shufflePodLabels = KubernetesUtils.parseKeyValuePairs(
+    sparkConf.get(KUBERNETES_SHUFFLE_LABELS),
+    KUBERNETES_SHUFFLE_LABELS.key,
+    "shuffle-labels")
+  if (shufflePodLabels.isEmpty) {
+    throw new SparkException(s"Dynamic allocation enabled " +
+      s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified")
+  }
+  private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337)
+  private val shufflePodCache = scala.collection.mutable.Map[String, String]()
+  private var watcher: Watch = _
+
+  override def start(appId: String): Unit = {
+    // seed the initial cache.
+    val pods = client.pods()
+      .inNamespace(shuffleNamespace)
+      .withLabels(shufflePodLabels.asJava)
+      .list()
+    pods.getItems.asScala.foreach {
+      pod =>
+        if (Readiness.isReady(pod)) {
+          addShufflePodToCache(pod)
+        } else {
+          logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " +
+            s"on node ${pod.getSpec.getNodeName}")
+        }
+    }
+
+    watcher = client
+      .pods()
+      .inNamespace(shuffleNamespace)
+      .withLabels(shufflePodLabels.asJava)
+      .watch(new Watcher[Pod] {
+        override def eventReceived(action: Watcher.Action, p: Pod): Unit = {
+          action match {
+            case Action.DELETED | Action.ERROR =>
+              shufflePodCache.remove(p.getSpec.getNodeName)
+            case Action.ADDED | Action.MODIFIED if Readiness.isReady(p) =>
+              addShufflePodToCache(p)
+          }
+        }
+        override def onClose(e: KubernetesClientException): Unit = {}
+      })
+    shuffleClient.init(appId)
+  }
+
+  private def addShufflePodToCache(pod: Pod): Unit = shufflePodCache.synchronized {
+    if (shufflePodCache.contains(pod.getSpec.getNodeName)) {
+      val registeredPodName = shufflePodCache(pod.getSpec.getNodeName)
+      if (registeredPodName.equals(pod.getStatus.getPodIP)) {
+        logWarning(s"The same pod $registeredPodName is added again on ${pod.getSpec.getNodeName}")
+      } else {
+        throw new SparkException(s"Ambiguous specification of shuffle service pod. " +
+          s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
+          s"$registeredPodName on ${pod.getSpec.getNodeName}")
+      }
+    } else {
+      shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP
+    }
+  }
+
+  override def stop(): Unit = {
+    watcher.close()
+    shuffleClient.close()
+  }
+
+  override def getShuffleServiceConfigurationForExecutor(executorPod: Pod)
+      : Seq[(String, String)] = {
+    val nodeName = executorPod.getSpec.getNodeName
+    val shufflePodIp = shufflePodCache.synchronized {
+      shufflePodCache.getOrElse(nodeName,
+        throw new SparkException(s"Unable to find shuffle pod on node $nodeName"))
+    }
+    // Inform the shuffle pod about this application so it can watch.
+    shuffleClient.registerDriverWithShuffleService(
+      shufflePodIp,
+      externalShufflePort,
+      sparkConf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
+      s"${sparkConf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
+      sparkConf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
+    )
+    Seq((SPARK_SHUFFLE_SERVICE_HOST.key, shufflePodIp))
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 52e7a12dbaf06..9e935944ae106 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -95,7 +95,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
       podAllocator,
       lifecycleEventHandler,
       watchEvents,
-      pollEvents) {
+      pollEvents,
+      None) {
       override def applicationId(): String = TEST_SPARK_APP_ID
     }
   }
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 216e8fe31becb..12b72848c3637 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -38,7 +38,7 @@ fi
 
 SPARK_K8S_CMD="$1"
 case "$SPARK_K8S_CMD" in
-    driver | driver-py | driver-r | executor)
+    driver | driver-py | driver-r | executor | shuffle-service)
       shift 1
       ;;
     "")
@@ -123,7 +123,13 @@ case "$SPARK_K8S_CMD" in
       --hostname $SPARK_EXECUTOR_POD_IP
     )
     ;;
-
+  shuffle-service)
+    CMD=(
+      ${JAVA_HOME}/bin/java
+      -cp "$SPARK_CLASSPATH"
+      org.apache.spark.deploy.k8s.KubernetesExternalShuffleService
+    )
+    ;;
   *)
     echo "Unknown command: $SPARK_K8S_CMD" 1>&2
     exit 1
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala
new file mode 100644
index 0000000000000..c44b7c9c01e7d
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.util
+
+import io.fabric8.kubernetes.api.model.{VolumeBuilder, VolumeMountBuilder}
+import io.fabric8.kubernetes.api.model.extensions.DaemonSetBuilder
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
+
+private[spark] trait DynamicAllocationTestSuite { k8sSuite: KubernetesSuite =>
+  import DynamicAllocationTestSuite._
+
+  private def startShuffleService(): Unit = {
+    val volume = new VolumeBuilder()
+      .withName(VOLUME_NAME)
+      .withNewHostPath(EXECUTOR_LOCAL_DIR)
+      .build()
+    val volumeMount = new VolumeMountBuilder()
+      .withName(VOLUME_NAME)
+      .withMountPath(EXECUTOR_LOCAL_DIR)
+      .build()
+    val daemonSet = new DaemonSetBuilder()
+      .withNewMetadata()
+        .withName(SHUFFLE_SERVICE_NAME)
+        .withLabels(SHUFFLE_SERVICE_LABEL)
+        .endMetadata()
+      .withNewSpec()
+        .withNewTemplate()
+          .withNewMetadata()
+            .withLabels(SHUFFLE_SERVICE_LABEL)
+            .endMetadata()
+          .withNewSpec()
+            .addNewContainer()
+              .withName("k8s-shuffle-service")
+              .withImage(image)
+              .withArgs("shuffle-service")
+              .withVolumeMounts(volumeMount)
+              .endContainer()
+            .withVolumes(volume)
+            .endSpec()
+          .endTemplate()
+        .endSpec()
+      .build()
+    kubernetesTestComponents.kubernetesClient
+      .extensions()
+      .daemonSets()
+      .inNamespace(kubernetesTestComponents.namespace)
+      .create(daemonSet)
+  }
+
+  private def stopShuffleService(): Unit = {
+    kubernetesTestComponents.kubernetesClient
+      .extensions()
+      .daemonSets()
+      .inNamespace(kubernetesTestComponents.namespace)
+      .withName(SHUFFLE_SERVICE_NAME)
+      .delete()
+  }
+
+  test("Run in cluster mode with dynamic allocation.", k8sTestTag) {
+    startShuffleService()
+    try {
+      runAndVerifyCompletion()
+    } finally {
+      stopShuffleService()
+    }
+  }
+
+  private def runAndVerifyCompletion(): Unit = {
+    sparkAppConf
+      .set("spark.local.dir", EXECUTOR_LOCAL_DIR)
+      .set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace)
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.shuffle.service.enabled", "true")
+      .set("spark.dynamicAllocation.maxExecutors", "4")
+      .set("spark.kubernetes.shuffle.labels", "app=k8s-shuffle-service")
+    runSparkApplicationAndVerifyCompletion(
+      containerLocalSparkDistroExamplesJar,
+      SPARK_GROUPBY_MAIN_CLASS,
+      Seq("RESULT: "),
+      Array("10", "400000", "2"),
+      doBasicDriverPodCheck,
+      doBasicExecutorPodCheck,
+      appLocator,
+      isJVM = true
+    )
+  }
+}
+
+private[spark] object DynamicAllocationTestSuite {
+  val SPARK_GROUPBY_MAIN_CLASS = "org.apache.spark.examples.SimpleSkewedGroupByTest"
+  val SHUFFLE_SERVICE_NAME = "k8s-external-shuffle-service"
+  val VOLUME_NAME = "shuffle-dir"
+  val EXECUTOR_LOCAL_DIR = "/tmp/spark-local"
+  val SHUFFLE_SERVICE_LABEL: util.Map[String, String] = Map("app" -> "k8s-shuffle-service").asJava
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index c99a907f98d0a..03164ebe604ff 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -38,12 +38,12 @@ import org.apache.spark.internal.Logging
 
 private[spark] class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
-  with PythonTestsSuite with ClientModeTestsSuite
+  with PythonTestsSuite with ClientModeTestsSuite with DynamicAllocationTestSuite
   with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
 
-  private var sparkHomeDir: Path = _
+  protected var sparkHomeDir: Path = _
   private var pyImage: String = _
   private var rImage: String = _
 
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 5615d6173eebd..fe8290257c17b 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -120,7 +120,7 @@ private[spark] object SparkAppLauncher extends Logging {
         appConf.toStringArray :+ appArguments.mainAppResource
 
     if (appArguments.appArgs.nonEmpty) {
-      commandLine += appArguments.appArgs.mkString(" ")
+      commandLine ++= appArguments.appArgs
     }
     logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
     ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 6494cbc18f33e..af85aae216784 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
 
-import java.io.File
 import java.nio.file.Paths
 
 import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
index 859aa836a3157..6d94b9efd1d29 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -28,8 +28,7 @@ import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
-import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat}
+import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver, ShuffleServiceHeartbeat}
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.util.ThreadUtils
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org