You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/07/13 21:06:36 UTC

[spark] branch master updated: [SPARK-32004][ALL] Drop references to slave

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 90ac9f9  [SPARK-32004][ALL] Drop references to slave
90ac9f9 is described below

commit 90ac9f975bbb73e2f020a6c310e00fe1e71b6258
Author: Holden Karau <hk...@apple.com>
AuthorDate: Mon Jul 13 14:05:33 2020 -0700

    [SPARK-32004][ALL] Drop references to slave
    
    ### What changes were proposed in this pull request?
    
    This change replaces the world slave with alternatives matching the context.
    
    ### Why are the changes needed?
    
    There is no need to call things slave, we might as well use better clearer names.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the ouput JSON does change. To allow backwards compatibility this is an additive change.
    The shell scripts for starting & stopping workers are renamed, and for backwards compatibility old scripts are added to call through to the new ones while printing a deprecation message to stderr.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #28864 from holdenk/SPARK-32004-drop-references-to-slave.
    
    Lead-authored-by: Holden Karau <hk...@apple.com>
    Co-authored-by: Holden Karau <ho...@pigscanfly.ca>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 conf/{slaves.template => workers.template}         |   0
 .../scala/org/apache/spark/HeartbeatReceiver.scala |   9 +-
 .../main/scala/org/apache/spark/SparkContext.scala |  16 +--
 .../apache/spark/api/java/JavaSparkContext.scala   |   6 +-
 .../org/apache/spark/deploy/JsonProtocol.scala     |  15 ++-
 .../org/apache/spark/internal/config/package.scala |   5 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala     |   4 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |   8 +-
 .../spark/scheduler/ExecutorLossReason.scala       |   4 +-
 .../org/apache/spark/scheduler/TaskScheduler.scala |   2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |   9 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  12 ++-
 .../cluster/StandaloneSchedulerBackend.scala       |   2 +-
 .../org/apache/spark/storage/BlockManager.scala    |  20 ++--
 .../apache/spark/storage/BlockManagerMaster.scala  |  22 ++--
 .../spark/storage/BlockManagerMasterEndpoint.scala |  73 +++++++------
 .../spark/storage/BlockManagerMessages.scala       |  26 ++---
 ...int.scala => BlockManagerStorageEndpoint.scala} |   6 +-
 .../scala/org/apache/spark/CheckpointSuite.scala   |   2 +-
 .../org/apache/spark/ContextCleanerSuite.scala     |  10 +-
 .../scala/org/apache/spark/DistributedSuite.scala  |  20 ++--
 .../apache/spark/ExternalShuffleServiceSuite.scala |  12 +--
 .../org/apache/spark/MapOutputTrackerSuite.scala   |  26 ++---
 .../apache/spark/broadcast/BroadcastSuite.scala    |  42 ++++----
 .../apache/spark/deploy/JsonProtocolSuite.scala    |   6 +-
 .../deploy/StandaloneDynamicAllocationSuite.scala  |   2 +-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |   8 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |   6 +-
 .../spark/scheduler/TaskSetManagerSuite.scala      |  16 +--
 .../spark/storage/BlockManagerInfoSuite.scala      |   2 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |  44 ++++----
 docs/configuration.md                              |   2 +-
 docs/job-scheduling.md                             |   2 +-
 docs/running-on-mesos.md                           |  16 +--
 docs/spark-standalone.md                           |  16 +--
 docs/streaming-programming-guide.md                |   2 +-
 .../apache/spark/deploy/mesos/ui/DriverPage.scala  |   4 +-
 .../spark/deploy/mesos/ui/MesosClusterPage.scala   |   6 +-
 .../spark/executor/MesosExecutorBackend.scala      |  12 +--
 .../cluster/mesos/MesosClusterScheduler.scala      |  30 +++---
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala |  93 ++++++++---------
 .../mesos/MesosFineGrainedSchedulerBackend.scala   | 115 +++++++++++----------
 .../scheduler/cluster/mesos/MesosScheduler.scala   |  28 +++++
 .../cluster/mesos/MesosSchedulerBackendUtil.scala  |  20 +++-
 .../cluster/mesos/MesosSchedulerUtils.scala        |  18 +---
 .../cluster/mesos/MesosClusterSchedulerSuite.scala |   6 +-
 .../MesosCoarseGrainedSchedulerBackendSuite.scala  |  12 +--
 .../spark/scheduler/cluster/mesos/Utils.scala      |  11 +-
 .../scheduler/cluster/YarnSchedulerBackend.scala   |   7 +-
 sbin/decommission-slave.sh                         |  40 +------
 ...ecommission-slave.sh => decommission-worker.sh} |   8 +-
 sbin/slaves.sh                                     |  86 +--------------
 sbin/spark-daemons.sh                              |   4 +-
 sbin/start-all.sh                                  |   4 +-
 sbin/start-slave.sh                                |  75 +-------------
 sbin/start-slaves.sh                               |  29 +-----
 sbin/{start-slave.sh => start-worker.sh}           |   6 +-
 sbin/{start-slaves.sh => start-workers.sh}         |  14 +--
 sbin/stop-all.sh                                   |   6 +-
 sbin/stop-slave.sh                                 |  27 +----
 sbin/stop-slaves.sh                                |  11 +-
 sbin/{stop-slave.sh => stop-worker.sh}             |   8 +-
 sbin/{stop-slaves.sh => stop-workers.sh}           |   2 +-
 sbin/{slaves.sh => workers.sh}                     |  45 +++++---
 .../spark/sql/hive/thriftserver/SparkSQLEnv.scala  |   2 +-
 .../org/apache/spark/sql/hive/TableReader.scala    |   4 +-
 .../streaming/api/java/JavaStreamingContext.scala  |   6 +-
 .../streaming/scheduler/ReceiverTracker.scala      |   2 +-
 .../spark/streaming/util/RawTextHelper.scala       |   2 +-
 69 files changed, 526 insertions(+), 690 deletions(-)

diff --git a/conf/slaves.template b/conf/workers.template
similarity index 100%
rename from conf/slaves.template
rename to conf/workers.template
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 2ac72e6..c99698f 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -80,7 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
   // executor ID -> timestamp of when the last heartbeat from this executor was received
   private val executorLastSeen = new HashMap[String, Long]
 
-  private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
+  private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
 
   private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
@@ -88,10 +88,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
 
   require(checkTimeoutIntervalMs <= executorTimeoutMs,
     s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
-      s"equal to ${config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key}.")
+      s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}.")
   require(executorHeartbeatIntervalMs <= executorTimeoutMs,
     s"${config.EXECUTOR_HEARTBEAT_INTERVAL.key} should be less than or " +
-      s"equal to ${config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key}")
+      s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}")
 
   private var timeoutCheckingTask: ScheduledFuture[_] = null
 
@@ -218,7 +218,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
             sc.schedulerBackend match {
               case backend: CoarseGrainedSchedulerBackend =>
                 backend.driverEndpoint.send(RemoveExecutor(executorId,
-                  SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
+                  ExecutorProcessLost(
+                    s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
 
               // LocalSchedulerBackend is used locally and only has one single executor
               case _: LocalSchedulerBackend =>
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5078ef4..06abc05 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1732,7 +1732,7 @@ class SparkContext(config: SparkConf) extends Logging {
   def version: String = SPARK_VERSION
 
   /**
-   * Return a map from the slave to the max memory available for caching and the remaining
+   * Return a map from the block manager to the max memory available for caching and the remaining
    * memory available for caching.
    */
   def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
@@ -2830,14 +2830,14 @@ object SparkContext extends Logging {
         scheduler.initialize(backend)
         (backend, scheduler)
 
-      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
-        checkResourcesPerTask(coresPerSlave.toInt)
-        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
-        val memoryPerSlaveInt = memoryPerSlave.toInt
-        if (sc.executorMemory > memoryPerSlaveInt) {
+      case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
+        checkResourcesPerTask(coresPerWorker.toInt)
+        // Check to make sure memory requested <= memoryPerWorker. Otherwise Spark will just hang.
+        val memoryPerWorkerInt = memoryPerWorker.toInt
+        if (sc.executorMemory > memoryPerWorkerInt) {
           throw new SparkException(
             "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
-              memoryPerSlaveInt, sc.executorMemory))
+              memoryPerWorkerInt, sc.executorMemory))
         }
 
         // For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
@@ -2850,7 +2850,7 @@ object SparkContext extends Logging {
 
         val scheduler = new TaskSchedulerImpl(sc)
         val localCluster = new LocalSparkCluster(
-          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
+          numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
         val masterUrls = localCluster.start()
         val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
         scheduler.initialize(backend)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 347f59f..39eb1ee 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -74,7 +74,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
   /**
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
    * @param appName A name for your application, to display on the cluster web UI
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param sparkHome The SPARK_HOME directory on the worker nodes
    * @param jarFile JAR file to send to the cluster. This can be a path on the local file system
    *                or an HDFS, HTTP, HTTPS, or FTP URL.
    */
@@ -84,7 +84,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
   /**
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
    * @param appName A name for your application, to display on the cluster web UI
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param sparkHome The SPARK_HOME directory on the worker nodes
    * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
    *             system or HDFS, HTTP, HTTPS, or FTP URLs.
    */
@@ -94,7 +94,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
   /**
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
    * @param appName A name for your application, to display on the cluster web UI
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param sparkHome The SPARK_HOME directory on the worker nodes
    * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
    *             system or HDFS, HTTP, HTTPS, or FTP URLs.
    * @param environment Environment variables to set on worker nodes
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 6c3276c..17733d9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -90,11 +90,12 @@ private[deploy] object JsonProtocol {
    *         `name` the description of the application
    *         `cores` total cores granted to the application
    *         `user` name of the user who submitted the application
-   *         `memoryperslave` minimal memory in MB required to each executor
-   *         `resourcesperslave` minimal resources required to each executor
+   *         `memoryperexecutor` minimal memory in MB required to each executor
+   *         `resourcesperexecutor` minimal resources required to each executor
    *         `submitdate` time in Date that the application is submitted
    *         `state` state of the application, see [[ApplicationState]]
    *         `duration` time in milliseconds that the application has been running
+   * For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
    */
   def writeApplicationInfo(obj: ApplicationInfo): JObject = {
     ("id" -> obj.id) ~
@@ -102,7 +103,10 @@ private[deploy] object JsonProtocol {
     ("name" -> obj.desc.name) ~
     ("cores" -> obj.coresGranted) ~
     ("user" -> obj.desc.user) ~
+    ("memoryperexecutor" -> obj.desc.memoryPerExecutorMB) ~
     ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
+    ("resourcesperexecutor" -> obj.desc.resourceReqsPerExecutor
+      .toList.map(writeResourceRequirement)) ~
     ("resourcesperslave" -> obj.desc.resourceReqsPerExecutor
       .toList.map(writeResourceRequirement)) ~
     ("submitdate" -> obj.submitDate.toString) ~
@@ -117,14 +121,17 @@ private[deploy] object JsonProtocol {
    * @return a Json object containing the following fields:
    *         `name` the description of the application
    *         `cores` max cores that can be allocated to the application, 0 means unlimited
-   *         `memoryperslave` minimal memory in MB required to each executor
-   *         `resourcesperslave` minimal resources required to each executor
+   *         `memoryperexecutor` minimal memory in MB required to each executor
+   *         `resourcesperexecutor` minimal resources required to each executor
    *         `user` name of the user who submitted the application
    *         `command` the command string used to submit the application
+   * For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
    */
   def writeApplicationDescription(obj: ApplicationDescription): JObject = {
     ("name" -> obj.name) ~
     ("cores" -> obj.maxCores.getOrElse(0)) ~
+    ("memoryperexecutor" -> obj.memoryPerExecutorMB) ~
+    ("resourcesperexecutor" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
     ("memoryperslave" -> obj.memoryPerExecutorMB) ~
     ("resourcesperslave" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
     ("user" -> obj.user) ~
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ee437c6..ca75a19 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -459,9 +459,10 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("60s")
 
-  private[spark] val STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT =
-    ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs")
+  private[spark] val STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT =
+    ConfigBuilder("spark.storage.blockManagerHeartbeatTimeoutMs")
       .version("0.7.0")
+      .withAlternative("spark.storage.blockManagerSlaveTimeoutMs")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 9742d12..d5f2111 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -81,7 +81,7 @@ private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: Inp
  * @param sc The SparkContext to associate the RDD with.
  * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
  *   variable references an instance of JobConf, then that JobConf will be used for the Hadoop job.
- *   Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
+ *   Otherwise, a new JobConf will be created on each executor using the enclosed Configuration.
  * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
  *     creates.
  * @param inputFormatClass Storage format of the data to be read.
@@ -140,7 +140,7 @@ class HadoopRDD[K, V](
 
   private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
-  // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+  // Returns a JobConf that will be used on executors to obtain input splits for Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
     if (shouldCloneJobConf) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 37f9e0b..cb024d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1912,9 +1912,9 @@ private[spark] class DAGScheduler(
    * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
    *
    * We will also assume that we've lost all shuffle blocks associated with the executor if the
-   * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
-   * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
-   * presume all shuffle data related to this executor to be lost.
+   * executor serves its own blocks (i.e., we're not using external shuffle), the entire executor
+   * process is lost (likely including the shuffle service), or a FetchFailed occurred, in which
+   * case we presume all shuffle data related to this executor to be lost.
    *
    * Optionally the epoch during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
@@ -2273,7 +2273,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
 
     case ExecutorLost(execId, reason) =>
       val workerLost = reason match {
-        case SlaveLost(_, true) => true
+        case ExecutorProcessLost(_, true) => true
         case _ => false
       }
       dagScheduler.handleExecutorLost(execId, workerLost)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index ee31093..4141ed7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
 import org.apache.spark.executor.ExecutorExitCode
 
 /**
- * Represents an explanation for an executor or whole slave failing or exiting.
+ * Represents an explanation for an executor or whole process failing or exiting.
  */
 private[spark]
 class ExecutorLossReason(val message: String) extends Serializable {
@@ -56,7 +56,7 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
  * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
  */
 private[spark]
-case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
+case class ExecutorProcessLost(_message: String = "Worker lost", workerLost: Boolean = false)
   extends ExecutorLossReason(_message)
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index e9e638a..08f9f3c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -45,7 +45,7 @@ private[spark] trait TaskScheduler {
 
   // Invoked after system has successfully initialized (typically in spark context).
   // Yarn uses this to bootstrap allocation of resources based on preferred locations,
-  // wait for slave registrations, etc.
+  // wait for executor registrations, etc.
   def postStartHook(): Unit = { }
 
   // Disconnect from the cluster.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 45cb5e5..12bd932 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -526,14 +526,14 @@ private[spark] class TaskSchedulerImpl(
   }
 
   /**
-   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
+   * Called by cluster manager to offer resources on workers. We respond by asking our active task
    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
    * that tasks are balanced across the cluster.
    */
   def resourceOffers(
       offers: IndexedSeq[WorkerOffer],
       isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized {
-    // Mark each slave as alive and remember its hostname
+    // Mark each worker as alive and remember its hostname
     // Also track if new executor is added
     var newExecAvail = false
     for (o <- offers) {
@@ -765,7 +765,8 @@ private[spark] class TaskSchedulerImpl(
               })
               if (executorIdToRunningTaskIds.contains(execId)) {
                 reason = Some(
-                  SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
+                  ExecutorProcessLost(
+                    s"Task $tid was lost, so marking the executor as lost as well."))
                 removeExecutor(execId, reason.get)
                 failedExecutor = Some(execId)
               }
@@ -936,7 +937,7 @@ private[spark] class TaskSchedulerImpl(
 
           case None =>
             // We may get multiple executorLost() calls with different loss reasons. For example,
-            // one may be triggered by a dropped connection from the slave while another may be a
+            // one may be triggered by a dropped connection from the worker while another may be a
             // report of executor termination from Mesos. We produce log messages for both so we
             // eventually report the termination reason.
             logError(s"Lost an executor $executorId (already removed): $reason")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e4f4000..6b9b4d6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -316,9 +316,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
       addressToExecutorId
         .get(remoteAddress)
-        .foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " +
-          "containers exceeding thresholds, or network issues. Check driver logs for WARN " +
-          "messages.")))
+        .foreach(removeExecutor(_,
+          ExecutorProcessLost("Remote RPC client disassociated. Likely due to " +
+            "containers exceeding thresholds, or network issues. Check driver logs for WARN " +
+            "messages.")))
     }
 
     // Make fake resource offers on just one executor
@@ -382,7 +383,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       }
     }
 
-    // Remove a disconnected slave from the cluster
+    // Remove a disconnected executor from the cluster
     private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
       logDebug(s"Asked to remove executor $executorId with reason $reason")
       executorDataMap.get(executorId) match {
@@ -556,7 +557,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     // Remove all the lingering executors that should be removed but not yet. The reason might be
     // because (1) disconnected event is not yet received; (2) executors die silently.
     executors.foreach { eid =>
-      removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
+      removeExecutor(eid,
+        ExecutorProcessLost("Stale executor after cluster manager re-registered."))
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 42c4646..ec1299a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -168,7 +168,7 @@ private[spark] class StandaloneSchedulerBackend(
       fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
-      case None => SlaveLost(message, workerLost = workerLost)
+      case None => ExecutorProcessLost(message, workerLost = workerLost)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e0478ad..6eec288 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -226,9 +226,9 @@ private[spark] class BlockManager(
   private val maxFailuresBeforeLocationRefresh =
     conf.get(config.BLOCK_FAILURES_BEFORE_LOCATION_REFRESH)
 
-  private val slaveEndpoint = rpcEnv.setupEndpoint(
+  private val storageEndpoint = rpcEnv.setupEndpoint(
     "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
-    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
+    new BlockManagerStorageEndpoint(rpcEnv, this, mapOutputTracker))
 
   // Pending re-registration action being executed asynchronously or null if none is pending.
   // Accesses should synchronize on asyncReregisterLock.
@@ -465,7 +465,7 @@ private[spark] class BlockManager(
       diskBlockManager.localDirsString,
       maxOnHeapMemory,
       maxOffHeapMemory,
-      slaveEndpoint)
+      storageEndpoint)
 
     blockManagerId = if (idFromMaster != null) idFromMaster else id
 
@@ -543,8 +543,8 @@ private[spark] class BlockManager(
    * an executor crash.
    *
    * This function deliberately fails silently if the master returns false (indicating that
-   * the slave needs to re-register). The error condition will be detected again by the next
-   * heart beat attempt or new block registration and another try to re-register all blocks
+   * the storage endpoint needs to re-register). The error condition will be detected again by the
+   * next heart beat attempt or new block registration and another try to re-register all blocks
    * will be made then.
    */
   private def reportAllBlocks(): Unit = {
@@ -568,7 +568,7 @@ private[spark] class BlockManager(
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
     master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, slaveEndpoint)
+      maxOffHeapMemory, storageEndpoint)
     reportAllBlocks()
   }
 
@@ -718,7 +718,7 @@ private[spark] class BlockManager(
    *
    * droppedMemorySize exists to account for when the block is dropped from memory to disk (so
    * it is still valid). This ensures that update in master will compensate for the increase in
-   * memory on slave.
+   * memory on the storage endpoint.
    */
   private def reportBlockStatus(
       blockId: BlockId,
@@ -736,7 +736,7 @@ private[spark] class BlockManager(
   /**
    * Actually send a UpdateBlockInfo message. Returns the master's response,
    * which will be true if the block was successfully recorded and false if
-   * the slave needs to re-register.
+   * the storage endpoint needs to re-register.
    */
   private def tryToReportBlockStatus(
       blockId: BlockId,
@@ -934,7 +934,7 @@ private[spark] class BlockManager(
     require(blockId != null, "BlockId is null")
 
     // Because all the remote blocks are registered in driver, it is not necessary to ask
-    // all the slave executors to get block status.
+    // all the storage endpoints to get block status.
     val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host)
     if (locationsAndStatusOption.isEmpty) {
       logDebug(s"Block $blockId is unknown by block manager master")
@@ -1960,7 +1960,7 @@ private[spark] class BlockManager(
     }
     remoteBlockTempFileManager.stop()
     diskBlockManager.stop()
-    rpcEnv.stop(slaveEndpoint)
+    rpcEnv.stop(storageEndpoint)
     blockInfoManager.clear()
     memoryStore.clear()
     futureExecutionContext.shutdownNow()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index f571e42..93492cc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -71,10 +71,10 @@ class BlockManagerMaster(
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
+      storageEndpoint: RpcEndpointRef): BlockManagerId = {
     logInfo(s"Registering BlockManager $id")
     val updatedId = driverEndpoint.askSync[BlockManagerId](
-      RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
+      RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint))
     logInfo(s"Registered BlockManager $updatedId")
     updatedId
   }
@@ -128,7 +128,7 @@ class BlockManagerMaster(
   }
 
   /**
-   * Remove a block from the slaves that have it. This can only be used to remove
+   * Remove a block from the storage endpoints that have it. This can only be used to remove
    * blocks that the driver knows about.
    */
   def removeBlock(blockId: BlockId): Unit = {
@@ -193,14 +193,14 @@ class BlockManagerMaster(
    * Return the block's status on all block managers, if any. NOTE: This is a
    * potentially expensive operation and should only be used for testing.
    *
-   * If askSlaves is true, this invokes the master to query each block manager for the most
-   * updated block statuses. This is useful when the master is not informed of the given block
+   * If askStorageEndpoints is true, this invokes the master to query each block manager for the
+   * most updated block statuses. This is useful when the master is not informed of the given block
    * by all block managers.
    */
   def getBlockStatus(
       blockId: BlockId,
-      askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
-    val msg = GetBlockStatus(blockId, askSlaves)
+      askStorageEndpoints: Boolean = true): Map[BlockManagerId, BlockStatus] = {
+    val msg = GetBlockStatus(blockId, askStorageEndpoints)
     /*
      * To avoid potential deadlocks, the use of Futures is necessary, because the master endpoint
      * should not block on waiting for a block manager, which can in turn be waiting for the
@@ -229,14 +229,14 @@ class BlockManagerMaster(
    * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This
    * is a potentially expensive operation and should only be used for testing.
    *
-   * If askSlaves is true, this invokes the master to query each block manager for the most
-   * updated block statuses. This is useful when the master is not informed of the given block
+   * If askStorageEndpoints is true, this invokes the master to query each block manager for the
+   * most updated block statuses. This is useful when the master is not informed of the given block
    * by all block managers.
    */
   def getMatchingBlockIds(
       filter: BlockId => Boolean,
-      askSlaves: Boolean): Seq[BlockId] = {
-    val msg = GetMatchingBlockIds(filter, askSlaves)
+      askStorageEndpoints: Boolean): Seq[BlockId] = {
+    val msg = GetMatchingBlockIds(filter, askStorageEndpoints)
     val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg)
     timeout.awaitResult(future)
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index f90216b..2a48177 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
 
 /**
  * BlockManagerMasterEndpoint is an [[IsolatedRpcEndpoint]] on the master node to track statuses
- * of all slaves' block managers.
+ * of all the storage endpoints' block managers.
  */
 private[spark]
 class BlockManagerMasterEndpoint(
@@ -101,8 +101,8 @@ class BlockManagerMasterEndpoint(
     RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv)
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
-      context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
+    case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint) =>
+      context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint))
 
     case _updateBlockInfo @
         UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@@ -135,14 +135,14 @@ class BlockManagerMasterEndpoint(
     case GetStorageStatus =>
       context.reply(storageStatus)
 
-    case GetBlockStatus(blockId, askSlaves) =>
-      context.reply(blockStatus(blockId, askSlaves))
+    case GetBlockStatus(blockId, askStorageEndpoints) =>
+      context.reply(blockStatus(blockId, askStorageEndpoints))
 
     case IsExecutorAlive(executorId) =>
       context.reply(blockManagerIdByExecutor.contains(executorId))
 
-    case GetMatchingBlockIds(filter, askSlaves) =>
-      context.reply(getMatchingBlockIds(filter, askSlaves))
+    case GetMatchingBlockIds(filter, askStorageEndpoints) =>
+      context.reply(getMatchingBlockIds(filter, askStorageEndpoints))
 
     case RemoveRdd(rddId) =>
       context.reply(removeRdd(rddId))
@@ -219,14 +219,14 @@ class BlockManagerMasterEndpoint(
 
   private def removeRdd(rddId: Int): Future[Seq[Int]] = {
     // First remove the metadata for the given RDD, and then asynchronously remove the blocks
-    // from the slaves.
+    // from the storage endpoints.
 
-    // The message sent to the slaves to remove the RDD
+    // The message sent to the storage endpoints to remove the RDD
     val removeMsg = RemoveRdd(rddId)
 
     // Find all blocks for the given RDD, remove the block from both blockLocations and
     // the blockManagerInfo that is tracking the blocks and create the futures which asynchronously
-    // remove the blocks from slaves and gives back the number of removed blocks
+    // remove the blocks from storage endpoints and gives back the number of removed blocks
     val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
     val blocksToDeleteByShuffleService =
       new mutable.HashMap[BlockManagerId, mutable.HashSet[RDDBlockId]]
@@ -255,7 +255,7 @@ class BlockManagerMasterEndpoint(
       }
     }
     val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
-      bmInfo.slaveEndpoint.ask[Int](removeMsg).recover {
+      bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
         // use 0 as default value means no blocks were removed
         handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
       }
@@ -276,13 +276,12 @@ class BlockManagerMasterEndpoint(
 
     Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures)
   }
-
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
     // Nothing to do in the BlockManagerMasterEndpoint data structures
     val removeMsg = RemoveShuffle(shuffleId)
     Future.sequence(
       blockManagerInfo.values.map { bm =>
-        bm.slaveEndpoint.ask[Boolean](removeMsg).recover {
+        bm.storageEndpoint.ask[Boolean](removeMsg).recover {
           // use false as default value means no shuffle data were removed
           handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
         }
@@ -301,7 +300,7 @@ class BlockManagerMasterEndpoint(
       removeFromDriver || !info.blockManagerId.isDriver
     }
     val futures = requiredBlockManagers.map { bm =>
-      bm.slaveEndpoint.ask[Int](removeMsg).recover {
+      bm.storageEndpoint.ask[Int](removeMsg).recover {
         // use 0 as default value means no blocks were removed
         handleBlockRemovalFailure("broadcast", broadcastId.toString, bm.blockManagerId, 0)
       }
@@ -343,7 +342,7 @@ class BlockManagerMasterEndpoint(
         blockManagerInfo.get(candidateBMId).foreach { bm =>
           val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
           val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
-          bm.slaveEndpoint.ask[Boolean](replicateMsg)
+          bm.storageEndpoint.ask[Boolean](replicateMsg)
         }
       }
     }
@@ -361,14 +360,14 @@ class BlockManagerMasterEndpoint(
   /**
    * Decommission the given Seq of blockmanagers
    *    - Adds these block managers to decommissioningBlockManagerSet Set
-   *    - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]]
+   *    - Sends the DecommissionBlockManager message to each of the [[BlockManagerReplicaEndpoint]]
    */
   def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = {
     val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
     val futures = newBlockManagersToDecommission.map { blockManagerId =>
       decommissioningBlockManagerSet.add(blockManagerId)
       val info = blockManagerInfo(blockManagerId)
-      info.slaveEndpoint.ask[Unit](DecommissionBlockManager)
+      info.storageEndpoint.ask[Unit](DecommissionBlockManager)
     }
     Future.sequence{ futures.toSeq }
   }
@@ -391,7 +390,7 @@ class BlockManagerMasterEndpoint(
     }.toSeq
   }
 
-  // Remove a block from the slaves that have it. This can only be used to remove
+  // Remove a block from the workers that have it. This can only be used to remove
   // blocks that the master knows about.
   private def removeBlockFromWorkers(blockId: BlockId): Unit = {
     val locations = blockLocations.get(blockId)
@@ -399,10 +398,10 @@ class BlockManagerMasterEndpoint(
       locations.foreach { blockManagerId: BlockManagerId =>
         val blockManager = blockManagerInfo.get(blockManagerId)
         blockManager.foreach { bm =>
-          // Remove the block from the slave's BlockManager.
+          // Remove the block from the BlockManager.
           // Doesn't actually wait for a confirmation and the message might get lost.
           // If message loss becomes frequent, we should add retry logic here.
-          bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)).recover {
+          bm.storageEndpoint.ask[Boolean](RemoveBlock(blockId)).recover {
             // use false as default value means no blocks were removed
             handleBlockRemovalFailure("block", blockId.toString, bm.blockManagerId, false)
           }
@@ -429,13 +428,13 @@ class BlockManagerMasterEndpoint(
    * Return the block's status for all block managers, if any. NOTE: This is a
    * potentially expensive operation and should only be used for testing.
    *
-   * If askSlaves is true, the master queries each block manager for the most updated block
-   * statuses. This is useful when the master is not informed of the given block by all block
+   * If askStorageEndpoints is true, the master queries each block manager for the most updated
+   * block statuses. This is useful when the master is not informed of the given block by all block
    * managers.
    */
   private def blockStatus(
       blockId: BlockId,
-      askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
+      askStorageEndpoints: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
     val getBlockStatus = GetBlockStatus(blockId)
     /*
      * Rather than blocking on the block status query, master endpoint should simply return
@@ -444,8 +443,8 @@ class BlockManagerMasterEndpoint(
      */
     blockManagerInfo.values.map { info =>
       val blockStatusFuture =
-        if (askSlaves) {
-          info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
+        if (askStorageEndpoints) {
+          info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus)
         } else {
           Future { info.getStatus(blockId) }
         }
@@ -457,19 +456,19 @@ class BlockManagerMasterEndpoint(
    * Return the ids of blocks present in all the block managers that match the given filter.
    * NOTE: This is a potentially expensive operation and should only be used for testing.
    *
-   * If askSlaves is true, the master queries each block manager for the most updated block
-   * statuses. This is useful when the master is not informed of the given block by all block
+   * If askStorageEndpoints is true, the master queries each block manager for the most updated
+   * block statuses. This is useful when the master is not informed of the given block by all block
    * managers.
    */
   private def getMatchingBlockIds(
       filter: BlockId => Boolean,
-      askSlaves: Boolean): Future[Seq[BlockId]] = {
+      askStorageEndpoints: Boolean): Future[Seq[BlockId]] = {
     val getMatchingBlockIds = GetMatchingBlockIds(filter)
     Future.sequence(
       blockManagerInfo.values.map { info =>
         val future =
-          if (askSlaves) {
-            info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
+          if (askStorageEndpoints) {
+            info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
           } else {
             Future { info.blocks.asScala.keys.filter(filter).toSeq }
           }
@@ -492,7 +491,7 @@ class BlockManagerMasterEndpoint(
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
+      storageEndpoint: RpcEndpointRef): BlockManagerId = {
     // the dummy id is not expected to contain the topology information.
     // we get that info here and respond back with a more fleshed out block manager id
     val id = BlockManagerId(
@@ -527,7 +526,7 @@ class BlockManagerMasterEndpoint(
         }
 
       blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
-        maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus)
+        maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)
     }
     listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
         Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
@@ -581,7 +580,7 @@ class BlockManagerMasterEndpoint(
       }
     }
 
-    // Remove the block from master tracking if it has been removed on all slaves.
+    // Remove the block from master tracking if it has been removed on all endpoints.
     if (locations.size == 0) {
       blockLocations.remove(blockId)
     }
@@ -642,14 +641,14 @@ class BlockManagerMasterEndpoint(
   }
 
   /**
-   * Returns an [[RpcEndpointRef]] of the [[BlockManagerSlaveEndpoint]] for sending RPC messages.
+   * Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages.
    */
   private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
       info <- blockManagerInfo.get(blockManagerId)
     ) yield {
-      info.slaveEndpoint
+      info.storageEndpoint
     }
   }
 
@@ -673,7 +672,7 @@ private[spark] class BlockManagerInfo(
     timeMs: Long,
     val maxOnHeapMem: Long,
     val maxOffHeapMem: Long,
-    val slaveEndpoint: RpcEndpointRef,
+    val storageEndpoint: RpcEndpointRef,
     val externalShuffleServiceBlockStatus: Option[JHashMap[BlockId, BlockStatus]])
   extends Logging {
 
@@ -707,7 +706,7 @@ private[spark] class BlockManagerInfo(
     var originalLevel: StorageLevel = StorageLevel.NONE
 
     if (blockExists) {
-      // The block exists on the slave already.
+      // The block exists on the storage endpoint already.
       val blockStatus: BlockStatus = _blocks.get(blockId)
       originalLevel = blockStatus.storageLevel
       originalMemSize = blockStatus.memSize
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 7d4f2ff..bbc076c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -24,37 +24,37 @@ import org.apache.spark.util.Utils
 
 private[spark] object BlockManagerMessages {
   //////////////////////////////////////////////////////////////////////////////////
-  // Messages from the master to slaves.
+  // Messages from the master to storage endpoints.
   //////////////////////////////////////////////////////////////////////////////////
-  sealed trait ToBlockManagerSlave
+  sealed trait ToBlockManagerMasterStorageEndpoint
 
-  // Remove a block from the slaves that have it. This can only be used to remove
+  // Remove a block from the storage endpoints that have it. This can only be used to remove
   // blocks that the master knows about.
-  case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave
+  case class RemoveBlock(blockId: BlockId) extends ToBlockManagerMasterStorageEndpoint
 
   // Replicate blocks that were lost due to executor failure
   case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
-    extends ToBlockManagerSlave
+    extends ToBlockManagerMasterStorageEndpoint
 
-  case object DecommissionBlockManager extends ToBlockManagerSlave
+  case object DecommissionBlockManager extends ToBlockManagerMasterStorageEndpoint
 
   // Remove all blocks belonging to a specific RDD.
-  case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
+  case class RemoveRdd(rddId: Int) extends ToBlockManagerMasterStorageEndpoint
 
   // Remove all blocks belonging to a specific shuffle.
-  case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave
+  case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerMasterStorageEndpoint
 
   // Remove all blocks belonging to a specific broadcast.
   case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
-    extends ToBlockManagerSlave
+    extends ToBlockManagerMasterStorageEndpoint
 
   /**
    * Driver to Executor message to trigger a thread dump.
    */
-  case object TriggerThreadDump extends ToBlockManagerSlave
+  case object TriggerThreadDump extends ToBlockManagerMasterStorageEndpoint
 
   //////////////////////////////////////////////////////////////////////////////////
-  // Messages from slaves to the master.
+  // Messages from storage endpoints to the master.
   //////////////////////////////////////////////////////////////////////////////////
   sealed trait ToBlockManagerMaster
 
@@ -132,10 +132,10 @@ private[spark] object BlockManagerMessages {
   case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId)
     extends ToBlockManagerMaster
 
-  case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
+  case class GetBlockStatus(blockId: BlockId, askStorageEndpoints: Boolean = true)
     extends ToBlockManagerMaster
 
-  case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
+  case class GetMatchingBlockIds(filter: BlockId => Boolean, askStorageEndpoints: Boolean = true)
     extends ToBlockManagerMaster
 
   case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
similarity index 94%
rename from core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
rename to core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index a3a7149..a69bebc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -27,17 +27,17 @@ import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * An RpcEndpoint to take commands from the master to execute options. For example,
- * this is used to remove blocks from the slave's BlockManager.
+ * this is used to remove blocks from the storage endpoint's BlockManager.
  */
 private[storage]
-class BlockManagerSlaveEndpoint(
+class BlockManagerStorageEndpoint(
     override val rpcEnv: RpcEnv,
     blockManager: BlockManager,
     mapOutputTracker: MapOutputTracker)
   extends IsolatedRpcEndpoint with Logging {
 
   private val asyncThreadPool =
-    ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool", 100)
+    ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100)
   private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
 
   // Operations that involve removing blocks may be slow and should be done asynchronously
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index a69381d..21090e9 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -194,7 +194,7 @@ trait RDDCheckpointTester { self: SparkFunSuite =>
   /**
    * Serialize and deserialize an object. This is useful to verify the objects
    * contents after deserialization (e.g., the contents of an RDD split after
-   * it is sent to a slave along with a task)
+   * it is sent to an executor along with a task)
    */
   protected def serializeDeserialize[T](obj: T): T = {
     val bytes = Utils.serialize(obj)
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 7a95ea0..81530a8 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -309,7 +309,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     assert(sc.env.blockManager.master.getMatchingBlockIds({
       case BroadcastBlockId(`taskClosureBroadcastId`, _) => true
       case _ => false
-    }, askSlaves = true).isEmpty)
+    }, askStorageEndpoints = true).isEmpty)
   }
 
   test("automatically cleanup RDD + shuffle + broadcast in distributed mode") {
@@ -349,7 +349,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     assert(sc.env.blockManager.master.getMatchingBlockIds({
       case BroadcastBlockId(`taskClosureBroadcastId`, _) => true
       case _ => false
-    }, askSlaves = true).isEmpty)
+    }, askStorageEndpoints = true).isEmpty)
   }
 }
 
@@ -528,7 +528,7 @@ class CleanerTester(
     blockManager.master.getMatchingBlockIds( _ match {
       case RDDBlockId(`rddId`, _) => true
       case _ => false
-    }, askSlaves = true)
+    }, askStorageEndpoints = true)
   }
 
   private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = {
@@ -536,14 +536,14 @@ class CleanerTester(
       case ShuffleBlockId(`shuffleId`, _, _) => true
       case ShuffleIndexBlockId(`shuffleId`, _, _) => true
       case _ => false
-    }, askSlaves = true)
+    }, askStorageEndpoints = true)
   }
 
   private def getBroadcastBlocks(broadcastId: Long): Seq[BlockId] = {
     blockManager.master.getMatchingBlockIds( _ match {
       case BroadcastBlockId(`broadcastId`, _) => true
       case _ => false
-    }, askSlaves = true)
+    }, askStorageEndpoints = true)
   }
 
   private def blockManager = sc.env.blockManager
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 4d157b9..2786280 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -45,11 +45,11 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     // this test will hang. Correct behavior is that executors don't crash but fail tasks
     // and the scheduler throws a SparkException.
 
-    // numSlaves must be less than numPartitions
-    val numSlaves = 3
+    // numWorkers must be less than numPartitions
+    val numWorkers = 3
     val numPartitions = 10
 
-    sc = new SparkContext("local-cluster[%s,1,1024]".format(numSlaves), "test")
+    sc = new SparkContext("local-cluster[%s,1,1024]".format(numWorkers), "test")
     val data = sc.parallelize(1 to 100, numPartitions).
       map(x => throw new NotSerializableExn(new NotSerializableClass))
     intercept[SparkException] {
@@ -69,10 +69,10 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     )
 
     masterStrings.foreach {
-      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
-        assert(numSlaves.toInt == 2)
-        assert(coresPerSlave.toInt == 1)
-        assert(memoryPerSlave.toInt == 1024)
+      case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
+        assert(numWorkers.toInt == 2)
+        assert(coresPerWorker.toInt == 1)
+        assert(memoryPerWorker.toInt == 1024)
     }
   }
 
@@ -227,7 +227,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     assert(data.count() === size)
     assert(data.count() === size)
     // ensure only a subset of partitions were cached
-    val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true)
+    val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD,
+      askStorageEndpoints = true)
     assert(rddBlocks.size === 0, s"expected no RDD blocks, found ${rddBlocks.size}")
   }
 
@@ -244,7 +245,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     assert(data.count() === size)
     assert(data.count() === size)
     // ensure only a subset of partitions were cached
-    val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true)
+    val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD,
+      askStorageEndpoints = true)
     assert(rddBlocks.size > 0, "no RDD blocks found")
     assert(rddBlocks.size < numPartitions, s"too many RDD blocks found, expected <$numPartitions")
   }
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index c217419..65391db 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -72,12 +72,12 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
     sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
     sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
 
-    // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
-    // If we don't wait for all slaves, it's possible that only one executor runs all jobs. Then
+    // In a slow machine, one executor may register hundreds of milliseconds ahead of the other one.
+    // If we don't wait for all executors, it's possible that only one executor runs all jobs. Then
     // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
     // local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
     // In this case, we won't receive FetchFailed. And it will make this test fail.
-    // Therefore, we should wait until all slaves are up
+    // Therefore, we should wait until all executors are up
     TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
 
     val rdd = sc.parallelize(0 until 1000, 10)
@@ -109,12 +109,12 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
     sc.env.blockManager.hostLocalDirManager.isDefined should equal(true)
     sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
 
-    // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
-    // If we don't wait for all slaves, it's possible that only one executor runs all jobs. Then
+    // In a slow machine, one executor may register hundreds of milliseconds ahead of the other one.
+    // If we don't wait for all executors, it's possible that only one executor runs all jobs. Then
     // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
     // local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
     // In this case, we won't receive FetchFailed. And it will make this test fail.
-    // Therefore, we should wait until all slaves are up
+    // Therefore, we should wait until all executors are up
     TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
 
     val rdd = sc.parallelize(0 until 1000, 10)
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 630ffd9..b5b68f6 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -136,21 +136,21 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
       new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
 
-    val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf))
-    val slaveTracker = new MapOutputTrackerWorker(conf)
-    slaveTracker.trackerEndpoint =
-      slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
+    val mapWorkerRpcEnv = createRpcEnv("spark-worker", hostname, 0, new SecurityManager(conf))
+    val mapWorkerTracker = new MapOutputTrackerWorker(conf)
+    mapWorkerTracker.trackerEndpoint =
+      mapWorkerRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
 
     masterTracker.registerShuffle(10, 1)
-    slaveTracker.updateEpoch(masterTracker.getEpoch)
+    mapWorkerTracker.updateEpoch(masterTracker.getEpoch)
     // This is expected to fail because no outputs have been registered for the shuffle.
-    intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) }
+    intercept[FetchFailedException] { mapWorkerTracker.getMapSizesByExecutorId(10, 0) }
 
     val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
     masterTracker.registerMapOutput(10, 0, MapStatus(
       BlockManagerId("a", "hostA", 1000), Array(1000L), 5))
-    slaveTracker.updateEpoch(masterTracker.getEpoch)
-    assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
+    mapWorkerTracker.updateEpoch(masterTracker.getEpoch)
+    assert(mapWorkerTracker.getMapSizesByExecutorId(10, 0).toSeq ===
       Seq((BlockManagerId("a", "hostA", 1000),
         ArrayBuffer((ShuffleBlockId(10, 5, 0), size1000, 0)))))
     assert(0 == masterTracker.getNumCachedSerializedBroadcast)
@@ -158,17 +158,17 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val masterTrackerEpochBeforeLossOfMapOutput = masterTracker.getEpoch
     masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
     assert(masterTracker.getEpoch > masterTrackerEpochBeforeLossOfMapOutput)
-    slaveTracker.updateEpoch(masterTracker.getEpoch)
-    intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) }
+    mapWorkerTracker.updateEpoch(masterTracker.getEpoch)
+    intercept[FetchFailedException] { mapWorkerTracker.getMapSizesByExecutorId(10, 0) }
 
     // failure should be cached
-    intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) }
+    intercept[FetchFailedException] { mapWorkerTracker.getMapSizesByExecutorId(10, 0) }
     assert(0 == masterTracker.getNumCachedSerializedBroadcast)
 
     masterTracker.stop()
-    slaveTracker.stop()
+    mapWorkerTracker.stop()
     rpcEnv.shutdown()
-    slaveRpcEnv.shutdown()
+    mapWorkerRpcEnv.shutdown()
   }
 
   test("remote fetch below max RPC message size") {
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index a6776ee..5e8b25f 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -68,14 +68,14 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
   }
 
   encryptionTest("Accessing TorrentBroadcast variables in a local cluster") { conf =>
-    val numSlaves = 4
+    val numWorkers = 4
     conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
     conf.set(config.BROADCAST_COMPRESS, true)
-    sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
+    sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numWorkers), "test", conf)
     val list = List[Int](1, 2, 3, 4)
     val broadcast = sc.broadcast(list)
-    val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
-    assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
+    val results = sc.parallelize(1 to numWorkers).map(x => (x, broadcast.value.sum))
+    assert(results.collect().toSet === (1 to numWorkers).map(x => (x, 10)).toSet)
   }
 
   test("TorrentBroadcast's blockifyObject and unblockifyObject are inverses") {
@@ -99,12 +99,12 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
   }
 
   test("Test Lazy Broadcast variables with TorrentBroadcast") {
-    val numSlaves = 2
-    sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test")
-    val rdd = sc.parallelize(1 to numSlaves)
+    val numWorkers = 2
+    sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numWorkers), "test")
+    val rdd = sc.parallelize(1 to numWorkers)
     val results = new DummyBroadcastClass(rdd).doSomething()
 
-    assert(results.toSet === (1 to numSlaves).map(x => (x, false)).toSet)
+    assert(results.toSet === (1 to numWorkers).map(x => (x, false)).toSet)
   }
 
   test("Unpersisting TorrentBroadcast on executors only in local mode") {
@@ -196,27 +196,27 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
    */
   private def testUnpersistTorrentBroadcast(distributed: Boolean,
       removeFromDriver: Boolean): Unit = {
-    val numSlaves = if (distributed) 2 else 0
+    val numWorkers = if (distributed) 2 else 0
 
     // Verify that blocks are persisted only on the driver
     def afterCreation(broadcastId: Long, bmm: BlockManagerMaster): Unit = {
       var blockId = BroadcastBlockId(broadcastId)
-      var statuses = bmm.getBlockStatus(blockId, askSlaves = true)
+      var statuses = bmm.getBlockStatus(blockId, askStorageEndpoints = true)
       assert(statuses.size === 1)
 
       blockId = BroadcastBlockId(broadcastId, "piece0")
-      statuses = bmm.getBlockStatus(blockId, askSlaves = true)
+      statuses = bmm.getBlockStatus(blockId, askStorageEndpoints = true)
       assert(statuses.size === 1)
     }
 
     // Verify that blocks are persisted in both the executors and the driver
     def afterUsingBroadcast(broadcastId: Long, bmm: BlockManagerMaster): Unit = {
       var blockId = BroadcastBlockId(broadcastId)
-      val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
-      assert(statuses.size === numSlaves + 1)
+      val statuses = bmm.getBlockStatus(blockId, askStorageEndpoints = true)
+      assert(statuses.size === numWorkers + 1)
 
       blockId = BroadcastBlockId(broadcastId, "piece0")
-      assert(statuses.size === numSlaves + 1)
+      assert(statuses.size === numWorkers + 1)
     }
 
     // Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver
@@ -224,16 +224,16 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
     def afterUnpersist(broadcastId: Long, bmm: BlockManagerMaster): Unit = {
       var blockId = BroadcastBlockId(broadcastId)
       var expectedNumBlocks = if (removeFromDriver) 0 else 1
-      var statuses = bmm.getBlockStatus(blockId, askSlaves = true)
+      var statuses = bmm.getBlockStatus(blockId, askStorageEndpoints = true)
       assert(statuses.size === expectedNumBlocks)
 
       blockId = BroadcastBlockId(broadcastId, "piece0")
       expectedNumBlocks = if (removeFromDriver) 0 else 1
-      statuses = bmm.getBlockStatus(blockId, askSlaves = true)
+      statuses = bmm.getBlockStatus(blockId, askStorageEndpoints = true)
       assert(statuses.size === expectedNumBlocks)
     }
 
-    testUnpersistBroadcast(distributed, numSlaves, afterCreation,
+    testUnpersistBroadcast(distributed, numWorkers, afterCreation,
       afterUsingBroadcast, afterUnpersist, removeFromDriver)
   }
 
@@ -248,7 +248,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
    */
   private def testUnpersistBroadcast(
       distributed: Boolean,
-      numSlaves: Int,  // used only when distributed = true
+      numWorkers: Int,  // used only when distributed = true
       afterCreation: (Long, BlockManagerMaster) => Unit,
       afterUsingBroadcast: (Long, BlockManagerMaster) => Unit,
       afterUnpersist: (Long, BlockManagerMaster) => Unit,
@@ -256,10 +256,10 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
 
     sc = if (distributed) {
       val _sc =
-        new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test")
+        new SparkContext("local-cluster[%d, 1, 1024]".format(numWorkers), "test")
       // Wait until all salves are up
       try {
-        TestUtils.waitUntilExecutorsUp(_sc, numSlaves, 60000)
+        TestUtils.waitUntilExecutorsUp(_sc, numWorkers, 60000)
         _sc
       } catch {
         case e: Throwable =>
@@ -278,7 +278,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
 
     // Use broadcast variable on all executors
     val partitions = 10
-    assert(partitions > numSlaves)
+    assert(partitions > numWorkers)
     val results = sc.parallelize(1 to partitions, partitions).map(x => (x, broadcast.value.sum))
     assert(results.collect().toSet === (1 to partitions).map(x => (x, list.sum)).toSet)
     afterUsingBroadcast(broadcast.id, blockManagerMaster)
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index eeccf56..354e6eb 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -106,6 +106,9 @@ object JsonConstants {
     """
       |{"id":"id","starttime":3,"name":"name",
       |"cores":0,"user":"%s",
+      |"memoryperexecutor":1234,
+      |"resourcesperexecutor":[{"name":"gpu",
+      |"amount":3},{"name":"fpga","amount":3}],
       |"memoryperslave":1234,
       |"resourcesperslave":[{"name":"gpu",
       |"amount":3},{"name":"fpga","amount":3}],
@@ -132,7 +135,8 @@ object JsonConstants {
 
   val appDescJsonStr =
     """
-      |{"name":"name","cores":4,"memoryperslave":1234,"resourcesperslave":[],
+      |{"name":"name","cores":4,"memoryperexecutor":1234,"resourcesperexecutor":[],
+      |"memoryperslave":1234,"resourcesperslave":[],
       |"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
     """.format(System.getProperty("user.name", "<unknown>")).stripMargin
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 57cbda3..c7c3ad2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -545,7 +545,7 @@ class StandaloneDynamicAllocationSuite
       // will not timeout anything related to executors.
       .set(config.Network.NETWORK_TIMEOUT.key, "2h")
       .set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h")
-      .set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h")
+      .set(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "1h")
   }
 
   /** Make a master to which our application will send executor requests. */
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 51d20d3..70138327 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -474,7 +474,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
-  test("All shuffle files on the slave should be cleaned up when slave lost") {
+  test("All shuffle files on the storage endpoint should be cleaned up when it is lost") {
     // reset the test context with the right shuffle service config
     afterEach()
     val conf = new SparkConf()
@@ -779,9 +779,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
   }
 
   private val shuffleFileLossTests = Seq(
-    ("slave lost with shuffle service", SlaveLost("", false), true, false),
-    ("worker lost with shuffle service", SlaveLost("", true), true, true),
-    ("worker lost without shuffle service", SlaveLost("", true), false, true),
+    ("executor process lost with shuffle service", ExecutorProcessLost("", false), true, false),
+    ("worker lost with shuffle service", ExecutorProcessLost("", true), true, true),
+    ("worker lost without shuffle service", ExecutorProcessLost("", true), false, true),
     ("executor failure with shuffle service", ExecutorKilled, true, false),
     ("executor failure without shuffle service", ExecutorKilled, false, true))
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a75bae5..e43be60 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -641,7 +641,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(0 === taskDescriptions2.length)
 
     // provide the actual loss reason for executor0
-    taskScheduler.executorLost("executor0", SlaveLost("oops"))
+    taskScheduler.executorLost("executor0", ExecutorProcessLost("oops"))
 
     // executor0's tasks should have failed now that the loss reason is known, so offering more
     // resources should make them be scheduled on the new executor.
@@ -1141,7 +1141,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
 
     // Now we fail our second executor.  The other task can still run on executor1, so make an offer
     // on that executor, and make sure that the other task (not the failed one) is assigned there.
-    taskScheduler.executorLost("executor1", SlaveLost("oops"))
+    taskScheduler.executorLost("executor1", ExecutorProcessLost("oops"))
     val nextTaskAttempts =
       taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten
     // Note: Its OK if some future change makes this already realize the taskset has become
@@ -1273,7 +1273,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(1 === taskDescriptions.length)
 
     // mark executor0 as dead
-    taskScheduler.executorLost("executor0", SlaveLost())
+    taskScheduler.executorLost("executor0", ExecutorProcessLost())
     assert(!taskScheduler.isExecutorAlive("executor0"))
     assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
     assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 759e682..95c8197 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -415,7 +415,7 @@ class TaskSetManagerSuite
 
     // Now mark host2 as dead
     sched.removeExecutor("exec2")
-    manager.executorLost("exec2", "host2", SlaveLost())
+    manager.executorLost("exec2", "host2", ExecutorProcessLost())
 
     // nothing should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None)
@@ -598,10 +598,10 @@ class TaskSetManagerSuite
       Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
     // test if the valid locality is recomputed when the executor is lost
     sched.removeExecutor("execC")
-    manager.executorLost("execC", "host2", SlaveLost())
+    manager.executorLost("execC", "host2", ExecutorProcessLost())
     assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
     sched.removeExecutor("execD")
-    manager.executorLost("execD", "host1", SlaveLost())
+    manager.executorLost("execD", "host1", ExecutorProcessLost())
     assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
   }
 
@@ -814,7 +814,7 @@ class TaskSetManagerSuite
     assert(resubmittedTasks === 0)
     assert(manager.runningTasks === 1)
 
-    manager.executorLost("execB", "host2", new SlaveLost())
+    manager.executorLost("execB", "host2", new ExecutorProcessLost())
     assert(manager.runningTasks === 0)
     assert(resubmittedTasks === 0)
   }
@@ -923,7 +923,7 @@ class TaskSetManagerSuite
     // Make sure schedBackend.killTask(2, "exec3", true, "another attempt succeeded") gets called
     assert(killTaskCalled)
     // Host 3 Losts, there's only task 2.0 on it, which killed by task 2.1
-    manager.executorLost("exec3", "host3", SlaveLost())
+    manager.executorLost("exec3", "host3", ExecutorProcessLost())
     // Check the resubmittedTasks
     assert(resubmittedTasks === 0)
   }
@@ -1044,8 +1044,8 @@ class TaskSetManagerSuite
     assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
     sched.removeExecutor("execA")
     sched.removeExecutor("execB.2")
-    manager.executorLost("execA", "host1", SlaveLost())
-    manager.executorLost("execB.2", "host2", SlaveLost())
+    manager.executorLost("execA", "host1", ExecutorProcessLost())
+    manager.executorLost("execB.2", "host2", ExecutorProcessLost())
     clock.advance(LOCALITY_WAIT_MS * 4)
     sched.addExecutor("execC", "host3")
     manager.executorAdded()
@@ -1569,7 +1569,7 @@ class TaskSetManagerSuite
 
     assert(resubmittedTasks.isEmpty)
     // Host 2 Losts, meaning we lost the map output task4
-    manager.executorLost("exec2", "host2", SlaveLost())
+    manager.executorLost("exec2", "host2", ExecutorProcessLost())
     // Make sure that task with index 2 is re-submitted
     assert(resubmittedTasks.contains(2))
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
index 01e3d6a..3f5ffaa 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
@@ -33,7 +33,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
         timeMs = 300,
         maxOnHeapMem = 10000,
         maxOffHeapMem = 20000,
-        slaveEndpoint = null,
+        storageEndpoint = null,
         if (svcEnabled) Some(new JHashMap[BlockId, BlockStatus]) else None)
       test(s"$testName externalShuffleServiceEnabled=$svcEnabled") {
         f(svcEnabled, bmInfo)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 75e755f..dc1c7cd 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1374,12 +1374,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.master.getLocations("list1").size === 0)
     assert(store.master.getLocations("list2").size === 1)
     assert(store.master.getLocations("list3").size === 1)
-    assert(store.master.getBlockStatus("list1", askSlaves = false).size === 0)
-    assert(store.master.getBlockStatus("list2", askSlaves = false).size === 1)
-    assert(store.master.getBlockStatus("list3", askSlaves = false).size === 1)
-    assert(store.master.getBlockStatus("list1", askSlaves = true).size === 0)
-    assert(store.master.getBlockStatus("list2", askSlaves = true).size === 1)
-    assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
+    assert(store.master.getBlockStatus("list1", askStorageEndpoints = false).size === 0)
+    assert(store.master.getBlockStatus("list2", askStorageEndpoints = false).size === 1)
+    assert(store.master.getBlockStatus("list3", askStorageEndpoints = false).size === 1)
+    assert(store.master.getBlockStatus("list1", askStorageEndpoints = true).size === 0)
+    assert(store.master.getBlockStatus("list2", askStorageEndpoints = true).size === 1)
+    assert(store.master.getBlockStatus("list3", askStorageEndpoints = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
     store.putIterator(
@@ -1390,17 +1390,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
-    // getBlockStatus without asking slaves should have the same result
-    // getBlockStatus with asking slaves, however, should return the actual block statuses
+    // getBlockStatus without asking storage endpoints should have the same result
+    // getBlockStatus with asking storage endpoints, however, should return the actual statuses
     assert(store.master.getLocations("list4").size === 0)
     assert(store.master.getLocations("list5").size === 0)
     assert(store.master.getLocations("list6").size === 0)
-    assert(store.master.getBlockStatus("list4", askSlaves = false).size === 0)
-    assert(store.master.getBlockStatus("list5", askSlaves = false).size === 0)
-    assert(store.master.getBlockStatus("list6", askSlaves = false).size === 0)
-    assert(store.master.getBlockStatus("list4", askSlaves = true).size === 0)
-    assert(store.master.getBlockStatus("list5", askSlaves = true).size === 1)
-    assert(store.master.getBlockStatus("list6", askSlaves = true).size === 1)
+    assert(store.master.getBlockStatus("list4", askStorageEndpoints = false).size === 0)
+    assert(store.master.getBlockStatus("list5", askStorageEndpoints = false).size === 0)
+    assert(store.master.getBlockStatus("list6", askStorageEndpoints = false).size === 0)
+    assert(store.master.getBlockStatus("list4", askStorageEndpoints = true).size === 0)
+    assert(store.master.getBlockStatus("list5", askStorageEndpoints = true).size === 1)
+    assert(store.master.getBlockStatus("list6", askStorageEndpoints = true).size === 1)
   }
 
   test("get matching blocks") {
@@ -1416,9 +1416,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
-    assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size
+    assert(store.master.getMatchingBlockIds(
+      _.toString.contains("list"), askStorageEndpoints = false).size
       === 3)
-    assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size
+    assert(store.master.getMatchingBlockIds(
+      _.toString.contains("list1"), askStorageEndpoints = false).size
       === 1)
 
     // insert some more blocks
@@ -1430,9 +1432,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
-    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size
+    assert(
+      store.master.getMatchingBlockIds(
+        _.toString.contains("newlist"), askStorageEndpoints = false).size
       === 1)
-    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size
+    assert(
+      store.master.getMatchingBlockIds(
+        _.toString.contains("newlist"), askStorageEndpoints = true).size
       === 3)
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
@@ -1443,7 +1449,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
       case RDDBlockId(1, _) => true
       case _ => false
-    }, askSlaves = true)
+    }, askStorageEndpoints = true)
     assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1)))
   }
 
diff --git a/docs/configuration.md b/docs/configuration.md
index 706c255..42f706b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1890,7 +1890,7 @@ Apart from these, the following properties are also available, and may be useful
   <td>
     Default timeout for all network interactions. This config will be used in place of
     <code>spark.core.connection.ack.wait.timeout</code>,
-    <code>spark.storage.blockManagerSlaveTimeoutMs</code>,
+    <code>spark.storage.blockManagerHeartbeatTimeoutMs</code>,
     <code>spark.shuffle.io.connectionTimeout</code>, <code>spark.rpc.askTimeout</code> or
     <code>spark.rpc.lookupTimeout</code> if they are not configured.
   </td>
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index eaacfa4..5c19c77 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -95,7 +95,7 @@ varies across cluster managers:
 In standalone mode, simply start your workers with `spark.shuffle.service.enabled` set to `true`.
 
 In Mesos coarse-grained mode, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh` on all
-slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
+worker nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
 through Marathon.
 
 In YARN mode, follow the instructions [here](running-on-yarn.html#configuring-the-external-shuffle-service).
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 6f6ae1c..578ab90 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -91,7 +91,7 @@ but Mesos can be run without ZooKeeper using a single master as well.
 ## Verification
 
 To verify that the Mesos cluster is ready for Spark, navigate to the Mesos master webui at port
-`:5050`  Confirm that all expected machines are present in the slaves tab.
+`:5050`  Confirm that all expected machines are present in the agents tab.
 
 
 # Connecting Spark to Mesos
@@ -99,7 +99,7 @@ To verify that the Mesos cluster is ready for Spark, navigate to the Mesos maste
 To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and
 a Spark driver program configured to connect to Mesos.
 
-Alternatively, you can also install Spark in the same location in all the Mesos slaves, and configure
+Alternatively, you can also install Spark in the same location in all the Mesos agents, and configure
 `spark.mesos.executor.home` (defaults to SPARK_HOME) to point to that location.
 
 ## Authenticating to Mesos
@@ -138,7 +138,7 @@ Then submit happens as described in Client mode or Cluster mode below
 
 ## Uploading Spark Package
 
-When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
+When Mesos runs a task on a Mesos agent for the first time, that agent must have a Spark binary
 package for running the Spark Mesos executor backend.
 The Spark package can be hosted at any Hadoop-accessible URI, including HTTP via `http://`,
 [Amazon Simple Storage Service](http://aws.amazon.com/s3) via `s3n://`, or HDFS via `hdfs://`.
@@ -237,7 +237,7 @@ For example:
 {% endhighlight %}
 
 
-Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves, as the Spark driver doesn't automatically upload local jars.
+Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos agents, as the Spark driver doesn't automatically upload local jars.
 
 # Mesos Run Modes
 
@@ -360,7 +360,7 @@ see [Dynamic Resource Allocation](job-scheduling.html#dynamic-resource-allocatio
 
 The External Shuffle Service to use is the Mesos Shuffle Service. It provides shuffle data cleanup functionality
 on top of the Shuffle Service since Mesos doesn't yet support notifying another framework's
-termination. To launch it, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh` on all slave nodes, with `spark.shuffle.service.enabled` set to `true`.
+termination. To launch it, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh` on all agent nodes, with `spark.shuffle.service.enabled` set to `true`.
 
 This can also be achieved through Marathon, using a unique host constraint, and the following command: `./bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService`.
 
@@ -840,17 +840,17 @@ See the [configuration page](configuration.html) for information on Spark config
 A few places to look during debugging:
 
 - Mesos master on port `:5050`
-  - Slaves should appear in the slaves tab
+  - Agents should appear in the agents tab
   - Spark applications should appear in the frameworks tab
   - Tasks should appear in the details of a framework
   - Check the stdout and stderr of the sandbox of failed tasks
 - Mesos logs
-  - Master and slave logs are both in `/var/log/mesos` by default
+  - Master and agent logs are both in `/var/log/mesos` by default
 
 And common pitfalls:
 
 - Spark assembly not reachable/accessible
-  - Slaves must be able to download the Spark binary package from the `http://`, `hdfs://` or `s3n://` URL you gave
+  - Agents must be able to download the Spark binary package from the `http://`, `hdfs://` or `s3n://` URL you gave
 - Firewall blocking communications
   - Check for messages about failed connections
   - Temporarily disable firewalls for debugging and then poke appropriate holes
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index f3c479b..4344893 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -44,7 +44,7 @@ the master's web UI, which is [http://localhost:8080](http://localhost:8080) by
 
 Similarly, you can start one or more workers and connect them to the master via:
 
-    ./sbin/start-slave.sh <master-spark-URL>
+    ./sbin/start-worker.sh <master-spark-URL>
 
 Once you have started a worker, look at the master's web UI ([http://localhost:8080](http://localhost:8080) by default).
 You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).
@@ -90,9 +90,9 @@ Finally, the following configuration options can be passed to the master and wor
 
 # Cluster Launch Scripts
 
-To launch a Spark standalone cluster with the launch scripts, you should create a file called conf/slaves in your Spark directory,
+To launch a Spark standalone cluster with the launch scripts, you should create a file called conf/workers in your Spark directory,
 which must contain the hostnames of all the machines where you intend to start Spark workers, one per line.
-If conf/slaves does not exist, the launch scripts defaults to a single machine (localhost), which is useful for testing.
+If conf/workers does not exist, the launch scripts defaults to a single machine (localhost), which is useful for testing.
 Note, the master machine accesses each of the worker machines via ssh. By default, ssh is run in parallel and requires password-less (using a private key) access to be setup.
 If you do not have a password-less setup, you can set the environment variable SPARK_SSH_FOREGROUND and serially provide a password for each worker.
 
@@ -100,12 +100,12 @@ If you do not have a password-less setup, you can set the environment variable S
 Once you've set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop's deploy scripts, and available in `SPARK_HOME/sbin`:
 
 - `sbin/start-master.sh` - Starts a master instance on the machine the script is executed on.
-- `sbin/start-slaves.sh` - Starts a worker instance on each machine specified in the `conf/slaves` file.
-- `sbin/start-slave.sh` - Starts a worker instance on the machine the script is executed on.
+- `sbin/start-workers.sh` - Starts a worker instance on each machine specified in the `conf/workers` file.
+- `sbin/start-worker.sh` - Starts a worker instance on the machine the script is executed on.
 - `sbin/start-all.sh` - Starts both a master and a number of workers as described above.
 - `sbin/stop-master.sh` - Stops the master that was started via the `sbin/start-master.sh` script.
-- `sbin/stop-slave.sh` - Stops all worker instances on the machine the script is executed on.
-- `sbin/stop-slaves.sh` - Stops all worker instances on the machines specified in the `conf/slaves` file.
+- `sbin/stop-worker.sh` - Stops all worker instances on the machine the script is executed on.
+- `sbin/stop-workers.sh` - Stops all worker instances on the machines specified in the `conf/workers` file.
 - `sbin/stop-all.sh` - Stops both the master and the workers as described above.
 
 Note that these scripts must be executed on the machine you want to run the Spark master on, not your local machine.
@@ -457,7 +457,7 @@ worker during one single schedule iteration.
 
 Spark's standalone mode offers a web-based user interface to monitor the cluster. The master and each worker has its own web UI that shows cluster and job statistics. By default, you can access the web UI for the master at port 8080. The port can be changed either in the configuration file or via command-line options.
 
-In addition, detailed log output for each job is also written to the work directory of each slave node (`SPARK_HOME/work` by default). You will see two files for each job, `stdout` and `stderr`, with all output it wrote to its console.
+In addition, detailed log output for each job is also written to the work directory of each worker node (`SPARK_HOME/work` by default). You will see two files for each job, `stdout` and `stderr`, with all output it wrote to its console.
 
 
 # Running Alongside Hadoop
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index ac4aa92..587deeb 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -2216,7 +2216,7 @@ In specific cases where the amount of data that needs to be retained for the str
 ### Task Launching Overheads
 {:.no_toc}
 If the number of tasks launched per second is high (say, 50 or more per second), then the overhead
-of sending out tasks to the slaves may be significant and will make it hard to achieve sub-second
+of sending out tasks to the executors may be significant and will make it hard to achieve sub-second
 latencies. The overhead can be reduced by the following changes:
 
 * **Execution mode**: Running Spark in Standalone mode or coarse-grained Mesos mode leads to
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
index 8dc123e..b8c64a2 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -92,8 +92,8 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
   private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = {
     submissionState.map { state =>
       <tr>
-        <td>Mesos Slave ID</td>
-        <td>{state.slaveId.getValue}</td>
+        <td>Mesos Agent ID</td>
+        <td>{state.agentId.getValue}</td>
       </tr>
       <tr>
         <td>Mesos Task ID</td>
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
index 173a9b8..7729063 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -41,7 +41,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
 
     val queuedHeaders = driverHeader ++ submissionHeader
     val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
-      Seq("Start Date", "Mesos Slave ID", "State") ++ sandboxHeader
+      Seq("Start Date", "Mesos Agent ID", "State") ++ sandboxHeader
     val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
       Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
     val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
@@ -81,7 +81,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
 
     val sandboxCol = if (proxy.isDefined) {
       val clusterSchedulerId = parent.scheduler.getSchedulerState().frameworkId
-      val sandBoxUri = s"${proxy.get}/#/agents/${state.slaveId.getValue}/frameworks/" +
+      val sandBoxUri = s"${proxy.get}/#/agents/${state.agentId.getValue}/frameworks/" +
         s"${clusterSchedulerId}/executors/${id}/browse"
       <a href={sandBoxUri}>Sandbox</a>
     } else {
@@ -103,7 +103,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
       <td>{state.driverDescription.command.mainClass}</td>
       <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
       <td>{UIUtils.formatDate(state.startDate)}</td>
-      <td>{state.slaveId.getValue}</td>
+      <td>{state.agentId.getValue}</td>
       <td>{stateString(state.mesosTaskStatus)}</td>
       <td>{sandboxCol}</td>
     </tr>
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 47243e8..b023cf1 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -26,18 +26,16 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
 import org.apache.mesos.protobuf.ByteString
 
 import org.apache.spark.{SparkConf, SparkEnv, TaskState}
-import org.apache.spark.TaskState
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.EXECUTOR_ID
 import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.scheduler.TaskDescription
-import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils
+import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil
 import org.apache.spark.util.Utils
 
 private[spark] class MesosExecutorBackend
   extends MesosExecutor
-  with MesosSchedulerUtils // TODO: fix
   with ExecutorBackend
   with Logging {
 
@@ -48,7 +46,7 @@ private[spark] class MesosExecutorBackend
     val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
     driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
       .setTaskId(mesosTaskId)
-      .setState(taskStateToMesos(state))
+      .setState(MesosSchedulerBackendUtil.taskStateToMesos(state))
       .setData(ByteString.copyFrom(data))
       .build())
   }
@@ -57,7 +55,7 @@ private[spark] class MesosExecutorBackend
       driver: ExecutorDriver,
       executorInfo: ExecutorInfo,
       frameworkInfo: FrameworkInfo,
-      slaveInfo: SlaveInfo): Unit = {
+      agentInfo: SlaveInfo): Unit = {
 
     // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
     val cpusPerTask = executorInfo.getResourcesList.asScala
@@ -78,11 +76,11 @@ private[spark] class MesosExecutorBackend
     val conf = new SparkConf(loadDefaults = true).setAll(properties)
     conf.set(EXECUTOR_ID, executorId)
     val env = SparkEnv.createExecutorEnv(
-      conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false)
+      conf, executorId, agentInfo.getHostname, cpusPerTask, None, isLocal = false)
 
     executor = new Executor(
       executorId,
-      slaveInfo.getHostname,
+      agentInfo.getHostname,
       env,
       resources = Map.empty[String, ResourceInformation])
   }
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 289b109..edcdb92 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.mesos.{Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{SlaveID => AgentID, TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
 
@@ -41,7 +41,7 @@ import org.apache.spark.util.Utils
  * @param driverDescription Submitted driver description from
  * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]]
  * @param taskId Mesos TaskID generated for the task
- * @param slaveId Slave ID that the task is assigned to
+ * @param agentId Agent ID that the task is assigned to
  * @param mesosTaskStatus The last known task status update.
  * @param startDate The date the task was launched
  * @param finishDate The date the task finished
@@ -50,7 +50,7 @@ import org.apache.spark.util.Utils
 private[spark] class MesosClusterSubmissionState(
     val driverDescription: MesosDriverDescription,
     val taskId: TaskID,
-    val slaveId: SlaveID,
+    val agentId: AgentID,
     var mesosTaskStatus: Option[TaskStatus],
     var startDate: Date,
     var finishDate: Option[Date],
@@ -59,7 +59,7 @@ private[spark] class MesosClusterSubmissionState(
 
   def copy(): MesosClusterSubmissionState = {
     new MesosClusterSubmissionState(
-      driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
+      driverDescription, taskId, agentId, mesosTaskStatus, startDate, finishDate, frameworkId)
   }
 }
 
@@ -113,7 +113,7 @@ private[spark] class MesosDriverState(
  * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
  * as Mesos tasks in a Mesos cluster.
  * All drivers are launched asynchronously by the framework, which will eventually be launched
- * by one of the slaves in the cluster. The results of the driver will be stored in slave's task
+ * by one of the agents in the cluster. The results of the driver will be stored in agent's task
  * sandbox which is accessible by visiting the Mesos UI.
  * This scheduler supports recovery by persisting all its state and performs task reconciliation
  * on recover, which gets all the latest state for all the drivers from Mesos master.
@@ -121,7 +121,7 @@ private[spark] class MesosDriverState(
 private[spark] class MesosClusterScheduler(
     engineFactory: MesosClusterPersistenceEngineFactory,
     conf: SparkConf)
-  extends Scheduler with MesosSchedulerUtils {
+  extends Scheduler with MesosSchedulerUtils with MesosScheduler {
   var frameworkUrl: String = _
   private val metricsSystem =
     MetricsSystem.createMetricsSystem(MetricsSystemInstances.MESOS_CLUSTER, conf,
@@ -139,10 +139,10 @@ private[spark] class MesosClusterScheduler(
   private var frameworkId: String = null
   // Holds all the launched drivers and current launch state, keyed by submission id.
   private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
-  // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
+  // Holds a map of driver id to expected agent id that is passed to Mesos for reconciliation.
   // All drivers that are loaded after failover are added here, as we need get the latest
   // state of the tasks from Mesos. Keyed by task Id.
-  private val pendingRecover = new mutable.HashMap[String, SlaveID]()
+  private val pendingRecover = new mutable.HashMap[String, AgentID]()
   // Stores all the submitted drivers that hasn't been launched, keyed by submission id
   private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
   // All supervised drivers that are waiting to retry after termination, keyed by submission id
@@ -277,7 +277,7 @@ private[spark] class MesosClusterScheduler(
     stateLock.synchronized {
       launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
         launchedDrivers(state.driverDescription.submissionId) = state
-        pendingRecover(state.taskId.getValue) = state.slaveId
+        pendingRecover(state.taskId.getValue) = state.agentId
       }
       queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
       // There is potential timing issue where a queued driver might have been launched
@@ -348,10 +348,10 @@ private[spark] class MesosClusterScheduler(
       if (!pendingRecover.isEmpty) {
         // Start task reconciliation if we need to recover.
         val statuses = pendingRecover.collect {
-          case (taskId, slaveId) =>
+          case (taskId, agentId) =>
             val newStatus = TaskStatus.newBuilder()
               .setTaskId(TaskID.newBuilder().setValue(taskId).build())
-              .setSlaveId(slaveId)
+              .setSlaveId(agentId)
               .setState(MesosTaskState.TASK_STAGING)
               .build()
             launchedDrivers.get(getSubmissionIdFromTaskId(taskId))
@@ -657,7 +657,7 @@ private[spark] class MesosClusterScheduler(
             finishedDrivers += new MesosClusterSubmissionState(
               submission,
               TaskID.newBuilder().setValue(submission.submissionId).build(),
-              SlaveID.newBuilder().setValue("").build(),
+              AgentID.newBuilder().setValue("").build(),
               None,
               null,
               None,
@@ -731,7 +731,7 @@ private[spark] class MesosClusterScheduler(
   override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = {
     logInfo(s"Framework re-registered with master ${masterInfo.getId}")
   }
-  override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
+  override def agentLost(driver: SchedulerDriver, agentId: AgentID): Unit = {}
   override def error(driver: SchedulerDriver, error: String): Unit = {
     logError("Error received: " + error)
     markErr()
@@ -815,13 +815,13 @@ private[spark] class MesosClusterScheduler(
   override def frameworkMessage(
       driver: SchedulerDriver,
       executorId: ExecutorID,
-      slaveId: SlaveID,
+      agentId: AgentID,
       message: Array[Byte]): Unit = {}
 
   override def executorLost(
       driver: SchedulerDriver,
       executorId: ExecutorID,
-      slaveId: SlaveID,
+      agentId: AgentID,
       status: Int): Unit = {}
 
   private def removeFromQueuedDrivers(subId: String): Boolean = {
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 0b44702..5e7a29a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.Future
 
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.Protos.{SlaveID => AgentID, TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.SchedulerDriver
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
@@ -40,7 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef}
-import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{ExecutorProcessLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
@@ -60,10 +60,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     master: String,
     securityManager: SecurityManager)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
-    with org.apache.mesos.Scheduler with MesosSchedulerUtils {
+  with MesosScheduler
+  with MesosSchedulerUtils {
 
-  // Blacklist a slave after this many failures
-  private val MAX_SLAVE_FAILURES = 2
+  // Blacklist a agent after this many failures
+  private val MAX_AGENT_FAILURES = 2
 
   private val maxCoresOption = conf.get(config.CORES_MAX)
 
@@ -116,10 +117,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   // executor limit
   private var launchingExecutors = false
 
-  // SlaveID -> Slave
-  // This map accumulates entries for the duration of the job.  Slaves are never deleted, because
+  // AgentID -> Agent
+  // This map accumulates entries for the duration of the job.  Agents are never deleted, because
   // we need to maintain e.g. failure state and connection state.
-  private val slaves = new mutable.HashMap[String, Slave]
+  private val agents = new mutable.HashMap[String, Agent]
 
   /**
    * The total number of executors we aim to have. Undefined when not using dynamic allocation.
@@ -147,7 +148,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private val extraCoresPerExecutor = conf.get(EXTRA_CORES_PER_EXECUTOR)
 
   // Offer constraints
-  private val slaveOfferConstraints =
+  private val agentOfferConstraints =
     parseConstraintString(sc.conf.get(CONSTRAINTS))
 
   // Reject offers with mismatched constraints in seconds
@@ -354,7 +355,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   /**
-   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+   * Method called by Mesos to offer resources on agents. We respond by launching an executor,
    * unless we've already launched more than we wanted to.
    */
   override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]): Unit = {
@@ -384,7 +385,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
       val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
         val offerAttributes = toAttributeMap(offer.getAttributesList)
-        matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+        matchesAttributeRequirements(agentOfferConstraints, offerAttributes)
       }
 
       declineUnmatchedOffers(d, unmatchedOffers)
@@ -441,7 +442,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
 
           logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
-            s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ")
+            s" ports: $ports" + s" on agent with agent id: ${task.getSlaveId.getValue} ")
         }
 
         driver.launchTasks(
@@ -495,18 +496,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
     var launchTasks = true
 
-    // TODO(mgummelt): combine offers for a single slave
+    // TODO(mgummelt): combine offers for a single agent
     //
     // round-robin create executors on the available offers
     while (launchTasks) {
       launchTasks = false
 
       for (offer <- offers) {
-        val slaveId = offer.getSlaveId.getValue
+        val agentId = offer.getSlaveId.getValue
         val offerId = offer.getId.getValue
         val resources = remainingResources(offerId)
 
-        if (canLaunchTask(slaveId, offer.getHostname, resources)) {
+        if (canLaunchTask(agentId, offer.getHostname, resources)) {
           // Create a task
           launchTasks = true
           val taskId = newMesosTaskId()
@@ -517,7 +518,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           val taskCPUs = executorCores(offerCPUs)
           val taskMemory = executorMemory(sc)
 
-          slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
+          agents.getOrElseUpdate(agentId, new Agent(offer.getHostname)).taskIDs.add(taskId)
 
           val (resourcesLeft, resourcesToUse) =
             partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
@@ -540,8 +541,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             gpusByTaskId(taskId) = taskGPUs
           }
         } else {
-          logDebug(s"Cannot launch a task for offer with id: $offerId on slave " +
-            s"with id: $slaveId. Requirements were not met for this offer.")
+          logDebug(s"Cannot launch a task for offer with id: $offerId on agent " +
+            s"with id: $agentId. Requirements were not met for this offer.")
         }
       }
     }
@@ -573,7 +574,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
   }
 
-  private def canLaunchTask(slaveId: String, offerHostname: String,
+  private def canLaunchTask(agentId: String, offerHostname: String,
                             resources: JList[Resource]): Boolean = {
     val offerMem = getResource(resources, "mem")
     val offerCPUs = getResource(resources, "cpus").toInt
@@ -587,7 +588,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       cpus + totalCoresAcquired <= maxCores &&
       mem <= offerMem &&
       numExecutors < executorLimit &&
-      slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
+      agents.get(agentId).map(_.taskFailures).getOrElse(0) < MAX_AGENT_FAILURES &&
       meetsPortRequirements &&
       satisfiesLocality(offerHostname)
   }
@@ -606,7 +607,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     }
 
     // Check the locality information
-    val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+    val currentHosts = agents.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
     val allDesiredHosts = hostToLocalTaskCount.map { case (k, v) => k }.toSet
 
     // Try to match locality for hosts which do not have executors yet, to potentially
@@ -622,13 +623,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus): Unit = {
     val taskId = status.getTaskId.getValue
-    val slaveId = status.getSlaveId.getValue
+    val agentId = status.getSlaveId.getValue
     val state = mesosToTaskState(status.getState)
 
     logInfo(s"Mesos task $taskId is now ${status.getState}")
 
     stateLock.synchronized {
-      val slave = slaves(slaveId)
+      val agent = agents(agentId)
 
       // If the shuffle service is enabled, have the driver register with each one of the
       // shuffle services. This allows the shuffle services to clean up state associated with
@@ -636,23 +637,23 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       // this through Mesos, since the shuffle services are set up independently.
       if (state.equals(TaskState.RUNNING) &&
           shuffleServiceEnabled &&
-          !slave.shuffleRegistered) {
+          !agent.shuffleRegistered) {
         assume(mesosExternalShuffleClient.isDefined,
           "External shuffle client was not instantiated even though shuffle service is enabled.")
         // TODO: Remove this and allow the MesosExternalShuffleService to detect
         // framework termination when new Mesos Framework HTTP API is available.
         val externalShufflePort = conf.get(config.SHUFFLE_SERVICE_PORT)
 
-        logDebug(s"Connecting to shuffle service on slave $slaveId, " +
-            s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
+        logDebug(s"Connecting to shuffle service on agent $agentId, " +
+            s"host ${agent.hostname}, port $externalShufflePort for app ${conf.getAppId}")
 
         mesosExternalShuffleClient.get
           .registerDriverWithShuffleService(
-            slave.hostname,
+            agent.hostname,
             externalShufflePort,
-            sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT),
+            sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT),
             sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
-        slave.shuffleRegistered = true
+        agent.shuffleRegistered = true
       }
 
       if (TaskState.isFinished(state)) {
@@ -666,16 +667,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           totalGpusAcquired -= gpus
           gpusByTaskId -= taskId
         }
-        // If it was a failure, mark the slave as failed for blacklisting purposes
+        // If it was a failure, mark the agent as failed for blacklisting purposes
         if (TaskState.isFailed(state)) {
-          slave.taskFailures += 1
+          agent.taskFailures += 1
 
-          if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
-            logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
+          if (agent.taskFailures >= MAX_AGENT_FAILURES) {
+            logInfo(s"Blacklisting Mesos agent $agentId due to too many failures; " +
                 "is Spark installed on it?")
           }
         }
-        executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
+        executorTerminated(d, agentId, taskId, s"Executor finished with state $state")
         // In case we'd rejected everything before but have now lost a node
         d.reviveOffers()
       }
@@ -708,7 +709,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     // See SPARK-12330
     val startTime = System.nanoTime()
 
-    // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
+    // agentIdsWithExecutors has no memory barrier, so this is eventually consistent
     while (numExecutors() > 0 &&
       System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) {
       Thread.sleep(100)
@@ -729,15 +730,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   override def frameworkMessage(
-      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]): Unit = {}
+      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: AgentID, b: Array[Byte]): Unit = {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Updates local view on
+   * Called when a agent is lost or a Mesos task finished. Updates local view on
    * what tasks are running. It also notifies the driver that an executor was removed.
    */
   private def executorTerminated(
       d: org.apache.mesos.SchedulerDriver,
-      slaveId: String,
+      agentId: String,
       taskId: String,
       reason: String): Unit = {
     stateLock.synchronized {
@@ -745,18 +746,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       // removeExecutor() internally will send a message to the driver endpoint but
       // the driver endpoint is not available now, otherwise an exception will be thrown.
       if (!stopCalled) {
-        removeExecutor(taskId, SlaveLost(reason))
+        removeExecutor(taskId, ExecutorProcessLost(reason))
       }
-      slaves(slaveId).taskIDs.remove(taskId)
+      agents(agentId).taskIDs.remove(taskId)
     }
   }
 
-  override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = {
-    logInfo(s"Mesos slave lost: ${slaveId.getValue}")
+  override def agentLost(d: org.apache.mesos.SchedulerDriver, agentId: AgentID): Unit = {
+    logInfo(s"Mesos agent lost: ${agentId.getValue}")
   }
 
   override def executorLost(
-      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
+      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: AgentID, status: Int): Unit = {
     logInfo("Mesos executor lost: %s".format(e.getValue))
   }
 
@@ -770,7 +771,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       resourceProfileToTotalExecs: Map[ResourceProfile, Int]
   ): Future[Boolean] = Future.successful {
     // We don't truly know if we can fulfill the full amount of executors
-    // since at coarse grain it depends on the amount of slaves available.
+    // since at coarse grain it depends on the amount of agents available.
     val numExecs = resourceProfileToTotalExecs.getOrElse(defaultProfile, 0)
     logInfo("Capping the total amount of executors to " + numExecs)
     executorLimitOption = Some(numExecs)
@@ -800,11 +801,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   private def numExecutors(): Int = {
-    slaves.values.map(_.taskIDs.size).sum
+    agents.values.map(_.taskIDs.size).sum
   }
 }
 
-private class Slave(val hostname: String) {
+private class Agent(val hostname: String) {
   val taskIDs = new mutable.HashSet[String]()
   var taskFailures = 0
   var shuffleRegistered = false
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index f1e3fca..586c2bd 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -23,7 +23,8 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, HashSet}
 
-import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, SlaveID => AgentID,
+  TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.SchedulerDriver
 import org.apache.mesos.protobuf.ByteString
 
@@ -46,12 +47,12 @@ private[spark] class MesosFineGrainedSchedulerBackend(
     sc: SparkContext,
     master: String)
   extends SchedulerBackend
-  with org.apache.mesos.Scheduler
+  with MesosScheduler
   with MesosSchedulerUtils {
 
-  // Stores the slave ids that has launched a Mesos executor.
-  val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
-  val taskIdToSlaveId = new HashMap[Long, String]
+  // Stores the agent ids that has launched a Mesos executor.
+  val agentIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
+  val taskIdToAgentId = new HashMap[Long, String]
 
   // An ExecutorInfo for our tasks
   var execArgs: Array[Byte] = null
@@ -64,7 +65,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   private[mesos] val mesosExecutorCores = sc.conf.get(mesosConfig.EXECUTOR_CORES)
 
   // Offer constraints
-  private[this] val slaveOfferConstraints =
+  private[this] val agentOfferConstraints =
     parseConstraintString(sc.conf.get(mesosConfig.CONSTRAINTS))
 
   // reject offers with mismatched constraints in seconds
@@ -217,7 +218,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
     val builder = new StringBuilder
     tasks.asScala.foreach { t =>
       builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
-        .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
+        .append("Agent id: ").append(t.getSlaveId.getValue).append("\n")
         .append("Task resources: ").append(t.getResourcesList).append("\n")
         .append("Executor resources: ").append(t.getExecutor.getResourcesList)
         .append("---------------------------------------------\n")
@@ -226,7 +227,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   }
 
   /**
-   * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
+   * Method called by Mesos to offer resources on agents. We respond by asking our active task sets
    * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
    * tasks are balanced across the cluster.
    */
@@ -237,7 +238,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
         offers.asScala.partition { o =>
           val offerAttributes = toAttributeMap(o.getAttributesList)
           val meetsConstraints =
-            matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+            matchesAttributeRequirements(agentOfferConstraints, offerAttributes)
 
           // add some debug messaging
           if (!meetsConstraints) {
@@ -259,7 +260,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
         val mem = getResource(o.getResourcesList, "mem")
         val cpus = getResource(o.getResourcesList, "cpus")
-        val slaveId = o.getSlaveId.getValue
+        val agentId = o.getSlaveId.getValue
         val offerAttributes = toAttributeMap(o.getAttributesList)
 
         // check offers for
@@ -269,7 +270,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
         val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
         val meetsRequirements =
           (meetsMemoryRequirements && meetsCPURequirements) ||
-          (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+          (agentIdToExecutorInfo.contains(agentId) && cpus >= scheduler.CPUS_PER_TASK)
         val debugstr = if (meetsRequirements) "Accepting" else "Declining"
         logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
           + s"$offerAttributes mem: $mem cpu: $cpus")
@@ -281,10 +282,10 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       unUsableOffers.foreach(o => d.declineOffer(o.getId))
 
       val workerOffers = usableOffers.map { o =>
-        val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
+        val cpus = if (agentIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
           getResource(o.getResourcesList, "cpus").toInt
         } else {
-          // If the Mesos executor has not been started on this slave yet, set aside a few
+          // If the Mesos executor has not been started on this agent yet, set aside a few
           // cores for the Mesos executor by offering fewer cores to the Spark executor
           (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
         }
@@ -294,51 +295,51 @@ private[spark] class MesosFineGrainedSchedulerBackend(
           cpus)
       }.toIndexedSeq
 
-      val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
-      val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
-      val slaveIdToResources = new HashMap[String, JList[Resource]]()
+      val agentIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
+      val agentIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
+      val agentIdToResources = new HashMap[String, JList[Resource]]()
       usableOffers.foreach { o =>
-        slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
+        agentIdToResources(o.getSlaveId.getValue) = o.getResourcesList
       }
 
       val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
 
-      val slavesIdsOfAcceptedOffers = HashSet[String]()
+      val agentsIdsOfAcceptedOffers = HashSet[String]()
 
       // Call into the TaskSchedulerImpl
       val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
       acceptedOffers
         .foreach { offer =>
           offer.foreach { taskDesc =>
-            val slaveId = taskDesc.executorId
-            slavesIdsOfAcceptedOffers += slaveId
-            taskIdToSlaveId(taskDesc.taskId) = slaveId
+            val agentId = taskDesc.executorId
+            agentsIdsOfAcceptedOffers += agentId
+            taskIdToAgentId(taskDesc.taskId) = agentId
             val (mesosTask, remainingResources) = createMesosTask(
               taskDesc,
-              slaveIdToResources(slaveId),
-              slaveId)
-            mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+              agentIdToResources(agentId),
+              agentId)
+            mesosTasks.getOrElseUpdate(agentId, new JArrayList[MesosTaskInfo])
               .add(mesosTask)
-            slaveIdToResources(slaveId) = remainingResources
+            agentIdToResources(agentId) = remainingResources
           }
         }
 
       // Reply to the offers
       val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
 
-      mesosTasks.foreach { case (slaveId, tasks) =>
-        slaveIdToWorkerOffer.get(slaveId).foreach(o =>
-          listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
+      mesosTasks.foreach { case (agentId, tasks) =>
+        agentIdToWorkerOffer.get(agentId).foreach(o =>
+          listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), agentId,
             // TODO: Add support for log urls for Mesos
             new ExecutorInfo(o.host, o.cores, Map.empty, Map.empty)))
         )
-        logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
-        d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
+        logTrace(s"Launching Mesos tasks on agent '$agentId', tasks:\n${getTasksSummary(tasks)}")
+        d.launchTasks(Collections.singleton(agentIdToOffer(agentId).getId), tasks, filters)
       }
 
       // Decline offers that weren't used
       // NOTE: This logic assumes that we only get a single offer for each host in a given batch
-      for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
+      for (o <- usableOffers if !agentsIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
         d.declineOffer(o.getId)
       }
     }
@@ -348,19 +349,19 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   def createMesosTask(
       task: TaskDescription,
       resources: JList[Resource],
-      slaveId: String): (MesosTaskInfo, JList[Resource]) = {
+      agentId: String): (MesosTaskInfo, JList[Resource]) = {
     val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
-    val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
-      (slaveIdToExecutorInfo(slaveId), resources)
+    val (executorInfo, remainingResources) = if (agentIdToExecutorInfo.contains(agentId)) {
+      (agentIdToExecutorInfo(agentId), resources)
     } else {
-      createExecutorInfo(resources, slaveId)
+      createExecutorInfo(resources, agentId)
     }
-    slaveIdToExecutorInfo(slaveId) = executorInfo
+    agentIdToExecutorInfo(agentId) = executorInfo
     val (finalResources, cpuResources) =
       partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
     val taskInfo = MesosTaskInfo.newBuilder()
       .setTaskId(taskId)
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+      .setSlaveId(AgentID.newBuilder().setValue(agentId).build())
       .setExecutor(executorInfo)
       .setName(task.name)
       .addAllResources(cpuResources.asJava)
@@ -375,12 +376,12 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       val state = mesosToTaskState(status.getState)
       synchronized {
         if (TaskState.isFailed(mesosToTaskState(status.getState))
-          && taskIdToSlaveId.contains(tid)) {
-          // We lost the executor on this slave, so remember that it's gone
-          removeExecutor(taskIdToSlaveId(tid), "Lost executor")
+          && taskIdToAgentId.contains(tid)) {
+          // We lost the executor on this agent, so remember that it's gone
+          removeExecutor(taskIdToAgentId(tid), "Lost executor")
         }
         if (TaskState.isFinished(state)) {
-          taskIdToSlaveId.remove(tid)
+          taskIdToAgentId.remove(tid)
         }
       }
       scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
@@ -406,39 +407,39 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   }
 
   override def frameworkMessage(
-      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]): Unit = {}
+      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: AgentID, b: Array[Byte]): Unit = {}
 
   /**
-   * Remove executor associated with slaveId in a thread safe manner.
+   * Remove executor associated with agentId in a thread safe manner.
    */
-  private def removeExecutor(slaveId: String, reason: String) = {
+  private def removeExecutor(agentId: String, reason: String) = {
     synchronized {
-      listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
-      slaveIdToExecutorInfo -= slaveId
+      listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), agentId, reason))
+      agentIdToExecutorInfo -= agentId
     }
   }
 
-  private def recordSlaveLost(
-      d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason): Unit = {
+  private def recordAgentLost(
+      d: org.apache.mesos.SchedulerDriver, agentId: AgentID, reason: ExecutorLossReason): Unit = {
     inClassLoader() {
-      logInfo("Mesos slave lost: " + slaveId.getValue)
-      removeExecutor(slaveId.getValue, reason.toString)
-      scheduler.executorLost(slaveId.getValue, reason)
+      logInfo("Mesos agent lost: " + agentId.getValue)
+      removeExecutor(agentId.getValue, reason.toString)
+      scheduler.executorLost(agentId.getValue, reason)
     }
   }
 
-  override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = {
-    recordSlaveLost(d, slaveId, SlaveLost())
+  override def agentLost(d: org.apache.mesos.SchedulerDriver, agentId: AgentID): Unit = {
+    recordAgentLost(d, agentId, ExecutorProcessLost())
   }
 
   override def executorLost(
       d: org.apache.mesos.SchedulerDriver,
       executorId: ExecutorID,
-      slaveId: SlaveID,
+      agentId: AgentID,
       status: Int): Unit = {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
-                                                                 slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
+    logInfo("Executor lost: %s, marking agent %s as lost".format(executorId.getValue,
+                                                                 agentId.getValue))
+    recordAgentLost(d, agentId, ExecutorExited(status, exitCausedByApp = true))
   }
 
   override def killTask(
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosScheduler.scala
new file mode 100644
index 0000000..f55b9ef
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosScheduler.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.{SlaveID => AgentID}
+
+trait MesosScheduler extends org.apache.mesos.Scheduler {
+  override def slaveLost(d: org.apache.mesos.SchedulerDriver, agentId: AgentID): Unit = {
+    agentLost(d, agentId)
+  }
+
+  def agentLost(d: org.apache.mesos.SchedulerDriver, agentId: AgentID): Unit
+}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 7b2f6a2..981b8e9 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,22 +17,23 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.mesos.Protos.{ContainerInfo, Environment, Image, NetworkInfo, Parameter, Secret, Volume}
+import org.apache.mesos.Protos.{ContainerInfo, Environment, Image, NetworkInfo, Parameter, Secret,
+  TaskState => MesosTaskState, Volume}
 import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
 import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.protobuf.ByteString
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TaskState}
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.mesos.config.MesosSecretConfig
 import org.apache.spark.internal.Logging
 
 /**
- * A collection of utility functions which can be used by both the
- * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]].
+ * A collection of utility functions which can be used by the
+ * MesosSchedulerBackend, [[MesosFineGrainedSchedulerBackend]] and the MesosExecutorBackend.
  */
-private[mesos] object MesosSchedulerBackendUtil extends Logging {
+private[spark] object MesosSchedulerBackendUtil extends Logging {
   /**
    * Parse a list of volume specs, each of which
    * takes the form [host-dir:]container-dir[:rw|:ro].
@@ -294,4 +295,13 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
       .setImage(imageProto)
       .build
   }
+
+  def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {
+    case TaskState.LAUNCHING => MesosTaskState.TASK_STARTING
+    case TaskState.RUNNING => MesosTaskState.TASK_RUNNING
+    case TaskState.FINISHED => MesosTaskState.TASK_FINISHED
+    case TaskState.FAILED => MesosTaskState.TASK_FAILED
+    case TaskState.KILLED => MesosTaskState.TASK_KILLED
+    case TaskState.LOST => MesosTaskState.TASK_LOST
+  }
 }
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index ed3bd35..5784ee3 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 import com.google.common.base.Splitter
 import com.google.common.io.Files
 import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{SlaveID => AgentID, TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.FrameworkInfo.Capability
 import org.apache.mesos.Protos.Resource.ReservationInfo
 import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3}
@@ -304,12 +304,12 @@ trait MesosSchedulerUtils extends Logging {
    * Match the requirements (if any) to the offer attributes.
    * if attribute requirements are not specified - return true
    * else if attribute is defined and no values are given, simple attribute presence is performed
-   * else if attribute name and value is specified, subset match is performed on slave attributes
+   * else if attribute name and value is specified, subset match is performed on agent attributes
    */
   def matchesAttributeRequirements(
-      slaveOfferConstraints: Map[String, Set[String]],
+      agentOfferConstraints: Map[String, Set[String]],
       offerAttributes: Map[String, GeneratedMessageV3]): Boolean = {
-    slaveOfferConstraints.forall {
+    agentOfferConstraints.forall {
       // offer has the required attribute and subsumes the required values for that attribute
       case (name, requiredValues) =>
         offerAttributes.get(name) match {
@@ -574,15 +574,6 @@ trait MesosSchedulerUtils extends Logging {
          MesosTaskState.TASK_UNREACHABLE => TaskState.LOST
   }
 
-  def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {
-    case TaskState.LAUNCHING => MesosTaskState.TASK_STARTING
-    case TaskState.RUNNING => MesosTaskState.TASK_RUNNING
-    case TaskState.FINISHED => MesosTaskState.TASK_FINISHED
-    case TaskState.FAILED => MesosTaskState.TASK_FAILED
-    case TaskState.KILLED => MesosTaskState.TASK_KILLED
-    case TaskState.LOST => MesosTaskState.TASK_LOST
-  }
-
   protected def declineOffer(
     driver: org.apache.mesos.SchedulerDriver,
     offer: Offer,
@@ -612,4 +603,3 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 }
-
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 9a50142..bb37bbd 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -413,7 +413,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
         Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", new Date()))
     assert(response.success)
-    val slaveId = SlaveID.newBuilder().setValue("s1").build()
+    val agentId = SlaveID.newBuilder().setValue("s1").build()
     val offer = Offer.newBuilder()
       .addResources(
         Resource.newBuilder().setRole("*")
@@ -425,7 +425,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
           .setType(Type.SCALAR))
       .setId(OfferID.newBuilder().setValue("o1").build())
       .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
-      .setSlaveId(slaveId)
+      .setSlaveId(agentId)
       .setHostname("host1")
       .build()
     // Offer the resource to launch the submitted driver
@@ -438,7 +438,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
 
     val taskStatus = TaskStatus.newBuilder()
       .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
-      .setSlaveId(slaveId)
+      .setSlaveId(agentId)
       .setState(MesosTaskState.TASK_KILLED)
       .build()
     // Update the status of the killed task
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 5ab277ed..4d7f644 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -105,7 +105,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend.statusUpdate(driver, status)
     verify(driver, times(1)).reviveOffers()
 
-    // Launches a new task on a valid offer from the same slave
+    // Launches a new task on a valid offer from the same agent
     offerResources(List(offer2))
     verifyTaskLaunched(driver, "o2")
   }
@@ -250,7 +250,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     verifyTaskLaunched(driver, "o2")
   }
 
-  test("mesos creates multiple executors on a single slave") {
+  test("mesos creates multiple executors on a single agent") {
     val executorCores = 4
     setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString))
 
@@ -727,10 +727,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
   private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
 
-  private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
+  private def registerMockExecutor(executorId: String, agentId: String, cores: Integer) = {
     val mockEndpointRef = mock[RpcEndpointRef]
     val mockAddress = mock[RpcAddress]
-    val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty,
+    val message = RegisterExecutor(executorId, mockEndpointRef, agentId, cores, Map.empty,
       Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     backend.driverEndpoint.askSync[Boolean](message)
@@ -766,10 +766,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     }
   }
 
-  private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
+  private def createTaskStatus(taskId: String, agentId: String, state: TaskState): TaskStatus = {
     TaskStatus.newBuilder()
       .setTaskId(TaskID.newBuilder().setValue(taskId).build())
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+      .setSlaveId(SlaveID.newBuilder().setValue(agentId).build())
       .setState(state)
       .build
   }
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 5a4bf1d..92676cc 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -46,7 +46,7 @@ object Utils {
 
   def createOffer(
                    offerId: String,
-                   slaveId: String,
+                   agentId: String,
                    mem: Int,
                    cpus: Int,
                    ports: Option[(Long, Long)] = None,
@@ -77,8 +77,8 @@ object Utils {
     builder.setId(createOfferId(offerId))
       .setFrameworkId(FrameworkID.newBuilder()
       .setValue("f1"))
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
-      .setHostname(s"host${slaveId}")
+      .setSlaveId(SlaveID.newBuilder().setValue(agentId))
+      .setHostname(s"host${agentId}")
       .addAllAttributes(attributes.asJava)
       .build()
   }
@@ -101,8 +101,8 @@ object Utils {
     OfferID.newBuilder().setValue(offerId).build()
   }
 
-  def createSlaveId(slaveId: String): SlaveID = {
-    SlaveID.newBuilder().setValue(slaveId).build()
+  def createAgentId(agentId: String): SlaveID = {
+    SlaveID.newBuilder().setValue(agentId).build()
   }
 
   def createExecutorId(executorId: String): ExecutorID = {
@@ -227,4 +227,3 @@ object Utils {
       .build()
   }
 }
-
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 0475b0a..3f2e884 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -264,13 +264,14 @@ private[spark] abstract class YarnSchedulerBackend(
               case NonFatal(e) =>
                 logWarning(s"Attempted to get executor loss reason" +
                   s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
-                  s" but got no response. Marking as slave lost.", e)
-                RemoveExecutor(executorId, SlaveLost())
+                  s" but got no response. Marking as agent lost.", e)
+                RemoveExecutor(executorId, ExecutorProcessLost())
             }(ThreadUtils.sameThread)
         case None =>
           logWarning("Attempted to check for an executor loss reason" +
             " before the AM has registered!")
-          Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
+          Future.successful(RemoveExecutor(executorId,
+            ExecutorProcessLost("AM is not yet registered.")))
       }
 
       removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
diff --git a/sbin/decommission-slave.sh b/sbin/decommission-slave.sh
old mode 100644
new mode 100755
index 4bbf257..858bede
--- a/sbin/decommission-slave.sh
+++ b/sbin/decommission-slave.sh
@@ -17,41 +17,7 @@
 # limitations under the License.
 #
 
-# A shell script to decommission all workers on a single slave
-#
-# Environment variables
-#
-#   SPARK_WORKER_INSTANCES The number of worker instances that should be
-#                          running on this slave.  Default is 1.
-
-# Usage: decommission-slave.sh [--block-until-exit]
-#   Decommissions all slaves on this worker machine
-
-set -ex
-
-if [ -z "${SPARK_HOME}" ]; then
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-. "${SPARK_HOME}/sbin/spark-config.sh"
-
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
-  "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker 1
-else
-  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
-    "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker $(( $i + 1 ))
-  done
-fi
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 
-# Check if --block-until-exit is set.
-# This is done for systems which block on the decomissioning script and on exit
-# shut down the entire system (e.g. K8s).
-if [ "$1" == "--block-until-exit" ]; then
-  shift
-  # For now we only block on the 0th instance if there multiple instances.
-  instance=$1
-  pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"
-  wait $pid
-fi
+>&2 echo "This script is deprecated, use decommission-worker.sh"
+"${DIR}/decommission-worker.sh" "$@"
diff --git a/sbin/decommission-slave.sh b/sbin/decommission-worker.sh
old mode 100644
new mode 100755
similarity index 87%
copy from sbin/decommission-slave.sh
copy to sbin/decommission-worker.sh
index 4bbf257..cf81a53
--- a/sbin/decommission-slave.sh
+++ b/sbin/decommission-worker.sh
@@ -17,15 +17,15 @@
 # limitations under the License.
 #
 
-# A shell script to decommission all workers on a single slave
+# A shell script to decommission all workers on a single worker
 #
 # Environment variables
 #
 #   SPARK_WORKER_INSTANCES The number of worker instances that should be
-#                          running on this slave.  Default is 1.
+#                          running on this worker machine.  Default is 1.
 
-# Usage: decommission-slave.sh [--block-until-exit]
-#   Decommissions all slaves on this worker machine
+# Usage: decommission-worker.sh [--block-until-exit]
+#   Decommissions all workers on this worker machine.
 
 set -ex
 
diff --git a/sbin/slaves.sh b/sbin/slaves.sh
index c971aa3..b92007e 100755
--- a/sbin/slaves.sh
+++ b/sbin/slaves.sh
@@ -17,87 +17,7 @@
 # limitations under the License.
 #
 
-# Run a shell command on all slave hosts.
-#
-# Environment Variables
-#
-#   SPARK_SLAVES    File naming remote hosts.
-#     Default is ${SPARK_CONF_DIR}/slaves.
-#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_HOME}/conf.
-#   SPARK_SLAVE_SLEEP Seconds to sleep between spawning remote commands.
-#   SPARK_SSH_OPTS Options passed to ssh when running remote commands.
-##
-
-usage="Usage: slaves.sh [--config <conf-dir>] command..."
-
-# if no args specified, show usage
-if [ $# -le 0 ]; then
-  echo $usage
-  exit 1
-fi
-
-if [ -z "${SPARK_HOME}" ]; then
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-. "${SPARK_HOME}/sbin/spark-config.sh"
-
-# If the slaves file is specified in the command line,
-# then it takes precedence over the definition in
-# spark-env.sh. Save it here.
-if [ -f "$SPARK_SLAVES" ]; then
-  HOSTLIST=`cat "$SPARK_SLAVES"`
-fi
-
-# Check if --config is passed as an argument. It is an optional parameter.
-# Exit if the argument is not a directory.
-if [ "$1" == "--config" ]
-then
-  shift
-  conf_dir="$1"
-  if [ ! -d "$conf_dir" ]
-  then
-    echo "ERROR : $conf_dir is not a directory"
-    echo $usage
-    exit 1
-  else
-    export SPARK_CONF_DIR="$conf_dir"
-  fi
-  shift
-fi
-
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-if [ "$HOSTLIST" = "" ]; then
-  if [ "$SPARK_SLAVES" = "" ]; then
-    if [ -f "${SPARK_CONF_DIR}/slaves" ]; then
-      HOSTLIST=`cat "${SPARK_CONF_DIR}/slaves"`
-    else
-      HOSTLIST=localhost
-    fi
-  else
-    HOSTLIST=`cat "${SPARK_SLAVES}"`
-  fi
-fi
-
-
-
-# By default disable strict host key checking
-if [ "$SPARK_SSH_OPTS" = "" ]; then
-  SPARK_SSH_OPTS="-o StrictHostKeyChecking=no"
-fi
-
-for slave in `echo "$HOSTLIST"|sed  "s/#.*$//;/^$/d"`; do
-  if [ -n "${SPARK_SSH_FOREGROUND}" ]; then
-    ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
-      2>&1 | sed "s/^/$slave: /"
-  else
-    ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
-      2>&1 | sed "s/^/$slave: /" &
-  fi
-  if [ "$SPARK_SLAVE_SLEEP" != "" ]; then
-    sleep $SPARK_SLAVE_SLEEP
-  fi
-done
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 
-wait
+>&2 echo "This script is deprecated, use workers.sh"
+"${DIR}/workers.sh" "$@"
diff --git a/sbin/spark-daemons.sh b/sbin/spark-daemons.sh
index dec2f44..9a5e5f3 100755
--- a/sbin/spark-daemons.sh
+++ b/sbin/spark-daemons.sh
@@ -17,7 +17,7 @@
 # limitations under the License.
 #
 
-# Run a Spark command on all slave hosts.
+# Run a Spark command on all worker hosts.
 
 usage="Usage: spark-daemons.sh [--config <conf-dir>] [start|stop] command instance-number args..."
 
@@ -33,4 +33,4 @@ fi
 
 . "${SPARK_HOME}/sbin/spark-config.sh"
 
-exec "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/spark-daemon.sh" "$@"
+exec "${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/spark-daemon.sh" "$@"
diff --git a/sbin/start-all.sh b/sbin/start-all.sh
index a5d30d2..064074e 100755
--- a/sbin/start-all.sh
+++ b/sbin/start-all.sh
@@ -19,7 +19,7 @@
 
 # Start all spark daemons.
 # Starts the master on this node.
-# Starts a worker on each node specified in conf/slaves
+# Starts a worker on each node specified in conf/workers
 
 if [ -z "${SPARK_HOME}" ]; then
   export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
@@ -32,4 +32,4 @@ fi
 "${SPARK_HOME}/sbin"/start-master.sh
 
 # Start Workers
-"${SPARK_HOME}/sbin"/start-slaves.sh
+"${SPARK_HOME}/sbin"/start-workers.sh
diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh
index 9b3b26b..6868253 100755
--- a/sbin/start-slave.sh
+++ b/sbin/start-slave.sh
@@ -17,76 +17,7 @@
 # limitations under the License.
 #
 
-# Starts a slave on the machine this script is executed on.
-#
-# Environment Variables
-#
-#   SPARK_WORKER_INSTANCES  The number of worker instances to run on this
-#                           slave.  Default is 1. Note it has been deprecate since Spark 3.0.
-#   SPARK_WORKER_PORT       The base port number for the first worker. If set,
-#                           subsequent workers will increment this number.  If
-#                           unset, Spark will find a valid port number, but
-#                           with no guarantee of a predictable pattern.
-#   SPARK_WORKER_WEBUI_PORT The base port for the web interface of the first
-#                           worker.  Subsequent workers will increment this
-#                           number.  Default is 8081.
-
-if [ -z "${SPARK_HOME}" ]; then
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-# NOTE: This exact class name is matched downstream by SparkSubmit.
-# Any changes need to be reflected there.
-CLASS="org.apache.spark.deploy.worker.Worker"
-
-if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/start-slave.sh <master> [options]"
-  pattern="Usage:"
-  pattern+="\|Using Spark's default log4j profile:"
-  pattern+="\|Started daemon with process name"
-  pattern+="\|Registered signal handler for"
-
-  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
-  exit 1
-fi
-
-. "${SPARK_HOME}/sbin/spark-config.sh"
-
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-# First argument should be the master; we need to store it aside because we may
-# need to insert arguments between it and the other arguments
-MASTER=$1
-shift
-
-# Determine desired worker port
-if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
-  SPARK_WORKER_WEBUI_PORT=8081
-fi
-
-# Start up the appropriate number of workers on this machine.
-# quick local function to start a worker
-function start_instance {
-  WORKER_NUM=$1
-  shift
-
-  if [ "$SPARK_WORKER_PORT" = "" ]; then
-    PORT_FLAG=
-    PORT_NUM=
-  else
-    PORT_FLAG="--port"
-    PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 ))
-  fi
-  WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 ))
-
-  "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
-     --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
-}
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 
-if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
-  start_instance 1 "$@"
-else
-  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
-    start_instance $(( 1 + $i )) "$@"
-  done
-fi
+>&2 echo "This script is deprecated, use start-worker.sh"
+"${DIR}/start-worker.sh" "$@"
diff --git a/sbin/start-slaves.sh b/sbin/start-slaves.sh
index f5269df..9b113d9 100755
--- a/sbin/start-slaves.sh
+++ b/sbin/start-slaves.sh
@@ -17,30 +17,7 @@
 # limitations under the License.
 #
 
-# Starts a slave instance on each machine specified in the conf/slaves file.
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 
-if [ -z "${SPARK_HOME}" ]; then
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-. "${SPARK_HOME}/sbin/spark-config.sh"
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-# Find the port number for the master
-if [ "$SPARK_MASTER_PORT" = "" ]; then
-  SPARK_MASTER_PORT=7077
-fi
-
-if [ "$SPARK_MASTER_HOST" = "" ]; then
-  case `uname` in
-      (SunOS)
-	  SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
-	  ;;
-      (*)
-	  SPARK_MASTER_HOST="`hostname -f`"
-	  ;;
-  esac
-fi
-
-# Launch the slaves
-"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
+>&2 echo "This script is deprecated, use start-workers.sh"
+"${DIR}/start-workers.sh" "$@"
diff --git a/sbin/start-slave.sh b/sbin/start-worker.sh
similarity index 93%
copy from sbin/start-slave.sh
copy to sbin/start-worker.sh
index 9b3b26b..fd58f01 100755
--- a/sbin/start-slave.sh
+++ b/sbin/start-worker.sh
@@ -17,12 +17,12 @@
 # limitations under the License.
 #
 
-# Starts a slave on the machine this script is executed on.
+# Starts a worker on the machine this script is executed on.
 #
 # Environment Variables
 #
 #   SPARK_WORKER_INSTANCES  The number of worker instances to run on this
-#                           slave.  Default is 1. Note it has been deprecate since Spark 3.0.
+#                           worker.  Default is 1. Note it has been deprecate since Spark 3.0.
 #   SPARK_WORKER_PORT       The base port number for the first worker. If set,
 #                           subsequent workers will increment this number.  If
 #                           unset, Spark will find a valid port number, but
@@ -40,7 +40,7 @@ fi
 CLASS="org.apache.spark.deploy.worker.Worker"
 
 if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/start-slave.sh <master> [options]"
+  echo "Usage: ./sbin/start-worker.sh <master> [options]"
   pattern="Usage:"
   pattern+="\|Using Spark's default log4j profile:"
   pattern+="\|Started daemon with process name"
diff --git a/sbin/start-slaves.sh b/sbin/start-workers.sh
similarity index 75%
copy from sbin/start-slaves.sh
copy to sbin/start-workers.sh
index f5269df..3867ef3 100755
--- a/sbin/start-slaves.sh
+++ b/sbin/start-workers.sh
@@ -17,7 +17,7 @@
 # limitations under the License.
 #
 
-# Starts a slave instance on each machine specified in the conf/slaves file.
+# Starts a worker instance on each machine specified in the conf/workers file.
 
 if [ -z "${SPARK_HOME}" ]; then
   export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
@@ -34,13 +34,13 @@ fi
 if [ "$SPARK_MASTER_HOST" = "" ]; then
   case `uname` in
       (SunOS)
-	  SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
-	  ;;
+          SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
+          ;;
       (*)
-	  SPARK_MASTER_HOST="`hostname -f`"
-	  ;;
+          SPARK_MASTER_HOST="`hostname -f`"
+          ;;
   esac
 fi
 
-# Launch the slaves
-"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
+# Launch the workers
+"${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
diff --git a/sbin/stop-all.sh b/sbin/stop-all.sh
index 4e476ca..2c40905 100755
--- a/sbin/stop-all.sh
+++ b/sbin/stop-all.sh
@@ -27,8 +27,8 @@ fi
 # Load the Spark configuration
 . "${SPARK_HOME}/sbin/spark-config.sh"
 
-# Stop the slaves, then the master
-"${SPARK_HOME}/sbin"/stop-slaves.sh
+# Stop the workers, then the master
+"${SPARK_HOME}/sbin"/stop-workers.sh
 "${SPARK_HOME}/sbin"/stop-master.sh
 
 if [ "$1" == "--wait" ]
@@ -36,7 +36,7 @@ then
   printf "Waiting for workers to shut down..."
   while true
   do
-    running=`${SPARK_HOME}/sbin/slaves.sh ps -ef | grep -v grep | grep deploy.worker.Worker`
+    running=`${SPARK_HOME}/sbin/workers.sh ps -ef | grep -v grep | grep deploy.worker.Worker`
     if [ -z "$running" ]
     then
       printf "\nAll workers successfully shut down.\n"
diff --git a/sbin/stop-slave.sh b/sbin/stop-slave.sh
index 685bcf5..71ed299 100755
--- a/sbin/stop-slave.sh
+++ b/sbin/stop-slave.sh
@@ -17,28 +17,7 @@
 # limitations under the License.
 #
 
-# A shell script to stop all workers on a single slave
-#
-# Environment variables
-#
-#   SPARK_WORKER_INSTANCES The number of worker instances that should be
-#                          running on this slave.  Default is 1.
-
-# Usage: stop-slave.sh
-#   Stops all slaves on this worker machine
-
-if [ -z "${SPARK_HOME}" ]; then
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
-
-. "${SPARK_HOME}/sbin/spark-config.sh"
-
-. "${SPARK_HOME}/bin/load-spark-env.sh"
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 
-if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
-  "${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1
-else
-  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
-    "${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.spark.deploy.worker.Worker $(( $i + 1 ))
-  done
-fi
+>&2 echo "This script is deprecated, use stop-worker.sh"
+"${DIR}/stop-worker.sh" "$@"
diff --git a/sbin/stop-slaves.sh b/sbin/stop-slaves.sh
index a57441b..c0aca68 100755
--- a/sbin/stop-slaves.sh
+++ b/sbin/stop-slaves.sh
@@ -17,12 +17,7 @@
 # limitations under the License.
 #
 
-if [ -z "${SPARK_HOME}" ]; then
-  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-fi
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 
-. "${SPARK_HOME}/sbin/spark-config.sh"
-
-. "${SPARK_HOME}/bin/load-spark-env.sh"
-
-"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin"/stop-slave.sh
+>&2 echo "This script is deprecated, use stop-workers.sh"
+"${DIR}/stop-workers.sh" "$@"
diff --git a/sbin/stop-slave.sh b/sbin/stop-worker.sh
similarity index 87%
copy from sbin/stop-slave.sh
copy to sbin/stop-worker.sh
index 685bcf5..112b62e 100755
--- a/sbin/stop-slave.sh
+++ b/sbin/stop-worker.sh
@@ -17,15 +17,15 @@
 # limitations under the License.
 #
 
-# A shell script to stop all workers on a single slave
+# A shell script to stop all workers on a single worker
 #
 # Environment variables
 #
 #   SPARK_WORKER_INSTANCES The number of worker instances that should be
-#                          running on this slave.  Default is 1.
+#                          running on this worker machine.  Default is 1.
 
-# Usage: stop-slave.sh
-#   Stops all slaves on this worker machine
+# Usage: stop-worker.sh
+#   Stops all workers on this worker machine
 
 if [ -z "${SPARK_HOME}" ]; then
   export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
diff --git a/sbin/stop-slaves.sh b/sbin/stop-workers.sh
similarity index 91%
copy from sbin/stop-slaves.sh
copy to sbin/stop-workers.sh
index a57441b..552800f 100755
--- a/sbin/stop-slaves.sh
+++ b/sbin/stop-workers.sh
@@ -25,4 +25,4 @@ fi
 
 . "${SPARK_HOME}/bin/load-spark-env.sh"
 
-"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin"/stop-slave.sh
+"${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin"/stop-worker.sh
diff --git a/sbin/slaves.sh b/sbin/workers.sh
similarity index 62%
copy from sbin/slaves.sh
copy to sbin/workers.sh
index c971aa3..cab0330 100755
--- a/sbin/slaves.sh
+++ b/sbin/workers.sh
@@ -17,18 +17,18 @@
 # limitations under the License.
 #
 
-# Run a shell command on all slave hosts.
+# Run a shell command on all worker hosts.
 #
 # Environment Variables
 #
-#   SPARK_SLAVES    File naming remote hosts.
-#     Default is ${SPARK_CONF_DIR}/slaves.
+#   SPARK_WORKERS    File naming remote hosts.
+#     Default is ${SPARK_CONF_DIR}/workers.
 #   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_HOME}/conf.
-#   SPARK_SLAVE_SLEEP Seconds to sleep between spawning remote commands.
+#   SPARK_WORKER_SLEEP Seconds to sleep between spawning remote commands.
 #   SPARK_SSH_OPTS Options passed to ssh when running remote commands.
 ##
 
-usage="Usage: slaves.sh [--config <conf-dir>] command..."
+usage="Usage: workers.sh [--config <conf-dir>] command..."
 
 # if no args specified, show usage
 if [ $# -le 0 ]; then
@@ -42,13 +42,18 @@ fi
 
 . "${SPARK_HOME}/sbin/spark-config.sh"
 
-# If the slaves file is specified in the command line,
+# If the workers file is specified in the command line,
 # then it takes precedence over the definition in
 # spark-env.sh. Save it here.
+if [ -f "$SPARK_WORKERS" ]; then
+  HOSTLIST=`cat "$SPARK_WORKERS"`
+fi
 if [ -f "$SPARK_SLAVES" ]; then
+  >&2 echo "SPARK_SLAVES is deprecated, use SPARK_WORKERS"
   HOSTLIST=`cat "$SPARK_SLAVES"`
 fi
 
+
 # Check if --config is passed as an argument. It is an optional parameter.
 # Exit if the argument is not a directory.
 if [ "$1" == "--config" ]
@@ -69,14 +74,22 @@ fi
 . "${SPARK_HOME}/bin/load-spark-env.sh"
 
 if [ "$HOSTLIST" = "" ]; then
-  if [ "$SPARK_SLAVES" = "" ]; then
-    if [ -f "${SPARK_CONF_DIR}/slaves" ]; then
+  if [ "$SPARK_SLAVES" = "" ] && [ "$SPARK_WORKERS" = "" ]; then
+    if [ -f "${SPARK_CONF_DIR}/workers" ]; then
+      HOSTLIST=`cat "${SPARK_CONF_DIR}/workers"`
+    elif [ -f "${SPARK_CONF_DIR}/slaves" ]; then
       HOSTLIST=`cat "${SPARK_CONF_DIR}/slaves"`
     else
       HOSTLIST=localhost
     fi
   else
-    HOSTLIST=`cat "${SPARK_SLAVES}"`
+    if [ -f "$SPARK_WORKERS" ]; then
+      HOSTLIST=`cat "$SPARK_WORKERS"`
+    fi
+    if [ -f "$SPARK_SLAVES" ]; then
+      >&2 echo "SPARK_SLAVES is deprecated, use SPARK_WORKERS"
+      HOSTLIST=`cat "$SPARK_SLAVES"`
+    fi
   fi
 fi
 
@@ -87,15 +100,19 @@ if [ "$SPARK_SSH_OPTS" = "" ]; then
   SPARK_SSH_OPTS="-o StrictHostKeyChecking=no"
 fi
 
-for slave in `echo "$HOSTLIST"|sed  "s/#.*$//;/^$/d"`; do
+for host in `echo "$HOSTLIST"|sed  "s/#.*$//;/^$/d"`; do
   if [ -n "${SPARK_SSH_FOREGROUND}" ]; then
-    ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
-      2>&1 | sed "s/^/$slave: /"
+    ssh $SPARK_SSH_OPTS "$host" $"${@// /\\ }" \
+      2>&1 | sed "s/^/$host: /"
   else
-    ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
-      2>&1 | sed "s/^/$slave: /" &
+    ssh $SPARK_SSH_OPTS "$host" $"${@// /\\ }" \
+      2>&1 | sed "s/^/$host: /" &
+  fi
+  if [ "$SPARK_WORKER_SLEEP" != "" ]; then
+    sleep $SPARK_WORKER_SLEEP
   fi
   if [ "$SPARK_SLAVE_SLEEP" != "" ]; then
+    >&2 echo "SPARK_SLAVE_SLEEP is deprecated, use SPARK_WORKER_SLEEP"
     sleep $SPARK_SLAVE_SLEEP
   fi
 done
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 233e622..109b7f4 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
 
-/** A singleton object for the master program. The slaves should not access this. */
+/** A singleton object for the master program. The executors should not access this. */
 private[hive] object SparkSQLEnv extends Logging {
   logDebug("Initializing SparkSQLEnv")
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 8ddb979..3e0d441 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -308,7 +308,7 @@ class HadoopTableReader(
 
   /**
    * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
-   * applied locally on each slave.
+   * applied locally on each executor.
    */
   private def createOldHadoopRDD(tableDesc: TableDesc, path: String): RDD[Writable] = {
     val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
@@ -330,7 +330,7 @@ class HadoopTableReader(
 
   /**
    * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be
-   * applied locally on each slave.
+   * applied locally on each executor.
    */
   private def createNewHadoopRDD(tableDesc: TableDesc, path: String): RDD[Writable] = {
     val newJobConf = new JobConf(hadoopConf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 2d53a1b..7850285 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -67,7 +67,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * @param master Name of the Spark Master
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param sparkHome The SPARK_HOME directory on the worker nodes
    * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the
    *                local file system or an HDFS, HTTP, HTTPS, or FTP URL.
    */
@@ -84,7 +84,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * @param master Name of the Spark Master
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param sparkHome The SPARK_HOME directory on the worker nodes
    * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
    *             system or HDFS, HTTP, HTTPS, or FTP URLs.
    */
@@ -101,7 +101,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * @param master Name of the Spark Master
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param sparkHome The SPARK_HOME directory on the worker nodes
    * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
    *             system or HDFS, HTTP, HTTPS, or FTP URLs.
    * @param environment Environment variables to set on worker nodes
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 13cf5cc..51c4b0f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -415,7 +415,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   }
 
   /**
-   * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
+   * Run the dummy Spark job to ensure that all executors have registered. This avoids all the
    * receivers to be scheduled on the same node.
    *
    * TODO Should poll the executor number and wait for executors according to
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 9cdfdb8..e207dab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -93,7 +93,7 @@ object RawTextHelper {
   }
 
   /**
-   * Warms up the SparkContext in master and slave by running tasks to force JIT kick in
+   * Warms up the SparkContext in master and executor by running tasks to force JIT kick in
    * before real workload starts.
    */
   def warmUp(sc: SparkContext): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org