You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/10/20 01:14:57 UTC

spark git commit: [SPARK-11131][CORE] Fix race in worker registration protocol.

Repository: spark
Updated Branches:
  refs/heads/master 67582132b -> 7ab0ce650


[SPARK-11131][CORE] Fix race in worker registration protocol.

Because the registration RPC was not really an RPC, but a bunch of
disconnected messages, it was possible for other messages to be
sent before the reply to the registration arrived, and that would
confuse the Worker. Especially in local-cluster mode, the worker was
succeptible to receiving an executor request before it received a
message from the master saying registration succeeded.

On top of the above, the change also fixes a ClassCastException when
the registration fails, which also affects the executor registration
protocol. Because the `ask` is issued with a specific return type,
if the error message (of a different type) was returned instead, the
code would just die with an exception. This is fixed by having a common
base trait for these reply messages.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #9138 from vanzin/SPARK-11131.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ab0ce65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ab0ce65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ab0ce65

Branch: refs/heads/master
Commit: 7ab0ce6501c37f0fc3a49e3332573ae4e4def3e8
Parents: 6758213
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Mon Oct 19 16:14:50 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Oct 19 16:14:50 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  7 +-
 .../org/apache/spark/deploy/master/Master.scala | 50 +++++++-------
 .../org/apache/spark/deploy/worker/Worker.scala | 73 +++++++++++++-------
 .../executor/CoarseGrainedExecutorBackend.scala |  4 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |  4 ++
 .../apache/spark/HeartbeatReceiverSuite.scala   |  4 +-
 6 files changed, 86 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7ab0ce65/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index d8084a5..3feb7ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -69,9 +69,14 @@ private[deploy] object DeployMessages {
 
   // Master to Worker
 
+  sealed trait RegisterWorkerResponse
+
   case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
+    with RegisterWorkerResponse
+
+  case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse
 
-  case class RegisterWorkerFailed(message: String) extends DeployMessage
+  case object MasterInStandby extends DeployMessage with RegisterWorkerResponse
 
   case class ReconnectWorker(masterUrl: String) extends DeployMessage
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7ab0ce65/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d518e92..6715d6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -233,31 +233,6 @@ private[deploy] class Master(
       System.exit(0)
     }
 
-    case RegisterWorker(
-        id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
-      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
-        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
-      if (state == RecoveryState.STANDBY) {
-        // ignore, don't send response
-      } else if (idToWorker.contains(id)) {
-        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
-      } else {
-        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
-          workerRef, workerUiPort, publicAddress)
-        if (registerWorker(worker)) {
-          persistenceEngine.addWorker(worker)
-          workerRef.send(RegisteredWorker(self, masterWebUiUrl))
-          schedule()
-        } else {
-          val workerAddress = worker.endpoint.address
-          logWarning("Worker registration failed. Attempted to re-register worker at same " +
-            "address: " + workerAddress)
-          workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
-            + workerAddress))
-        }
-      }
-    }
-
     case RegisterApplication(description, driver) => {
       // TODO Prevent repeated registrations from some driver
       if (state == RecoveryState.STANDBY) {
@@ -387,6 +362,31 @@ private[deploy] class Master(
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+    case RegisterWorker(
+        id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
+      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
+        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
+      if (state == RecoveryState.STANDBY) {
+        context.reply(MasterInStandby)
+      } else if (idToWorker.contains(id)) {
+        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
+      } else {
+        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+          workerRef, workerUiPort, publicAddress)
+        if (registerWorker(worker)) {
+          persistenceEngine.addWorker(worker)
+          context.reply(RegisteredWorker(self, masterWebUiUrl))
+          schedule()
+        } else {
+          val workerAddress = worker.endpoint.address
+          logWarning("Worker registration failed. Attempted to re-register worker at same " +
+            "address: " + workerAddress)
+          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+            + workerAddress))
+        }
+      }
+    }
+
     case RequestSubmitDriver(description) => {
       if (state != RecoveryState.ALIVE) {
         val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/7ab0ce65/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 93a1b3f..a45867e 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut
 
 import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
 import scala.concurrent.ExecutionContext
-import scala.util.Random
+import scala.util.{Failure, Random, Success}
 import scala.util.control.NonFatal
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -213,8 +213,7 @@ private[deploy] class Worker(
             logInfo("Connecting to master " + masterAddress + "...")
             val masterEndpoint =
               rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
-            masterEndpoint.send(RegisterWorker(
-              workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+            registerWithMaster(masterEndpoint)
           } catch {
             case ie: InterruptedException => // Cancelled
             case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -271,8 +270,7 @@ private[deploy] class Worker(
                   logInfo("Connecting to master " + masterAddress + "...")
                   val masterEndpoint =
                     rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
-                  masterEndpoint.send(RegisterWorker(
-                    workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+                  registerWithMaster(masterEndpoint)
                 } catch {
                   case ie: InterruptedException => // Cancelled
                   case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -341,25 +339,54 @@ private[deploy] class Worker(
     }
   }
 
-  override def receive: PartialFunction[Any, Unit] = {
-    case RegisteredWorker(masterRef, masterWebUiUrl) =>
-      logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
-      registered = true
-      changeMaster(masterRef, masterWebUiUrl)
-      forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          self.send(SendHeartbeat)
-        }
-      }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
-      if (CLEANUP_ENABLED) {
-        logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
+  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
+    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
+      workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+      .onComplete {
+        // This is a very fast action so we can use "ThreadUtils.sameThread"
+        case Success(msg) =>
+          Utils.tryLogNonFatalError {
+            handleRegisterResponse(msg)
+          }
+        case Failure(e) =>
+          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
+          System.exit(1)
+      }(ThreadUtils.sameThread)
+  }
+
+  private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
+    msg match {
+      case RegisteredWorker(masterRef, masterWebUiUrl) =>
+        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
+        registered = true
+        changeMaster(masterRef, masterWebUiUrl)
         forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
-            self.send(WorkDirCleanup)
+            self.send(SendHeartbeat)
           }
-        }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
-      }
+        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
+        if (CLEANUP_ENABLED) {
+          logInfo(
+            s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
+          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
+            override def run(): Unit = Utils.tryLogNonFatalError {
+              self.send(WorkDirCleanup)
+            }
+          }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
+        }
 
+      case RegisterWorkerFailed(message) =>
+        if (!registered) {
+          logError("Worker registration failed: " + message)
+          System.exit(1)
+        }
+
+      case MasterInStandby =>
+        // Ignore. Master not yet ready.
+    }
+  }
+
+  override def receive: PartialFunction[Any, Unit] = synchronized {
     case SendHeartbeat =>
       if (connected) { sendToMaster(Heartbeat(workerId, self)) }
 
@@ -399,12 +426,6 @@ private[deploy] class Worker(
         map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
       masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
 
-    case RegisterWorkerFailed(message) =>
-      if (!registered) {
-        logError("Worker registration failed: " + message)
-        System.exit(1)
-      }
-
     case ReconnectWorker(masterUrl) =>
       logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
       registerWithMaster()

http://git-wip-us.apache.org/repos/asf/spark/blob/7ab0ce65/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 49059de..a9c6a05 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -59,12 +59,12 @@ private[spark] class CoarseGrainedExecutorBackend(
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
-      ref.ask[RegisteredExecutor.type](
+      ref.ask[RegisterExecutorResponse](
         RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
     }(ThreadUtils.sameThread).onComplete {
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) => Utils.tryLogNonFatalError {
-        Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
+        Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
       }
       case Failure(e) => {
         logError(s"Cannot register with driver: $driverUrl", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/7ab0ce65/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index e0d25dc..4652df3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -36,9 +36,13 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
     extends CoarseGrainedClusterMessage
 
+  sealed trait RegisterExecutorResponse
+
   case object RegisteredExecutor extends CoarseGrainedClusterMessage
+    with RegisterExecutorResponse
 
   case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
+    with RegisterExecutorResponse
 
   // Executors to driver
   case class RegisterExecutor(

http://git-wip-us.apache.org/repos/asf/spark/blob/7ab0ce65/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18f2229..3cd80c0 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -173,9 +173,9 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
-    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
       RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
-    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
       RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
     heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org