You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/02 03:38:24 UTC

spark git commit: [SPARK-6443] [SPARK SUBMIT] Could not submit app in standalone cluster mode when HA is enabled

Repository: spark
Updated Branches:
  refs/heads/master 202219341 -> b4b43df8a


[SPARK-6443] [SPARK SUBMIT] Could not submit app in standalone cluster mode when HA is enabled

**3/26 update:**
* Akka-based:
  Use an array of `ActorSelection` to represent multiple master. Add an `activeMasterActor` for query status of driver. And will add lost masters( including the standby one) to `lostMasters`.
  When size of `lostMasters` equals or greater than # of all masters, we should give an error that all masters are not avalible.

* Rest-based:
  When all masters are not available(throw an exception), we use akka gateway to submit apps.

I have tested simply on standalone HA cluster(with two masters alive and one alive/one dead), it worked.

There might remains some issues on style or message print, but we can check the solution then fix them together.

/cc srowen andrewor14

Author: WangTaoTheTonic <wa...@huawei.com>

Closes #5116 from WangTaoTheTonic/SPARK-6443 and squashes the following commits:

2a28aab [WangTaoTheTonic] based the newest change https://github.com/apache/spark/pull/5144
76fd411 [WangTaoTheTonic] rebase
f4f972b [WangTaoTheTonic] rebase...again
a41de0b [WangTaoTheTonic] rebase
220cb3c [WangTaoTheTonic] move connect exception inside
35119a0 [WangTaoTheTonic] style and compile issues
9d636be [WangTaoTheTonic] per Andrew's comments
979760c [WangTaoTheTonic] rebase
e4f4ece [WangTaoTheTonic] fix failed test
5d23958 [WangTaoTheTonic] refact some duplicated code, style and comments
7a881b3 [WangTaoTheTonic] when one of masters is gone, we still can submit
2b011c9 [WangTaoTheTonic] fix broken tests
60d97a4 [WangTaoTheTonic] rebase
fa1fa80 [WangTaoTheTonic] submit app to HA cluster in standalone cluster mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4b43df8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4b43df8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4b43df8

Branch: refs/heads/master
Commit: b4b43df8a338a30c0eadcf10cbe3ba203dc3f861
Parents: 2022193
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Fri May 1 18:38:20 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri May 1 18:38:20 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  |  73 ++++++---
 .../apache/spark/deploy/ClientArguments.scala   |   9 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |   8 +-
 .../org/apache/spark/deploy/master/Master.scala |  24 ++-
 .../deploy/rest/RestSubmissionClient.scala      | 152 ++++++++++++++-----
 .../spark/deploy/worker/WorkerArguments.scala   |   2 +-
 .../scala/org/apache/spark/util/Utils.scala     |  16 ++
 .../deploy/rest/StandaloneRestSubmitSuite.scala |  40 ++---
 8 files changed, 229 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index c2c3e9a..848b62f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy
 
+import scala.collection.mutable.HashSet
 import scala.concurrent._
 
 import akka.actor._
@@ -31,21 +32,24 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
 
 /**
  * Proxy that relays messages to the driver.
+ *
+ * We currently don't support retry if submission fails. In HA mode, client will submit request to
+ * all masters and see which one could handle it.
  */
 private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
   extends Actor with ActorLogReceive with Logging {
 
-  var masterActor: ActorSelection = _
+  private val masterActors = driverArgs.masters.map { m =>
+    context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
+  }
+  private val lostMasters = new HashSet[Address]
+  private var activeMasterActor: ActorSelection = null
+
   val timeout = RpcUtils.askTimeout(conf)
 
   override def preStart(): Unit = {
-    masterActor = context.actorSelection(
-      Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
-
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
-    println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
-
     driverArgs.cmd match {
       case "launch" =>
         // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
@@ -79,11 +83,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
           driverArgs.supervise,
           command)
 
-        masterActor ! RequestSubmitDriver(driverDescription)
+        // This assumes only one Master is active at a time
+        for (masterActor <- masterActors) {
+          masterActor ! RequestSubmitDriver(driverDescription)
+        }
 
       case "kill" =>
         val driverId = driverArgs.driverId
-        masterActor ! RequestKillDriver(driverId)
+        // This assumes only one Master is active at a time
+        for (masterActor <- masterActors) {
+          masterActor ! RequestKillDriver(driverId)
+        }
     }
   }
 
@@ -92,10 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
     println("... waiting before polling master for driver state")
     Thread.sleep(5000)
     println("... polling master for driver state")
-    val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
+    val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
       .mapTo[DriverStatusResponse]
     val statusResponse = Await.result(statusFuture, timeout)
-
     statusResponse.found match {
       case false =>
         println(s"ERROR: Cluster master did not recognize $driverId")
@@ -122,20 +131,46 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
 
     case SubmitDriverResponse(success, driverId, message) =>
       println(message)
-      if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
+      if (success) {
+        activeMasterActor = context.actorSelection(sender.path)
+        pollAndReportStatus(driverId.get)
+      } else if (!Utils.responseFromBackup(message)) {
+        System.exit(-1)
+      }
+
 
     case KillDriverResponse(driverId, success, message) =>
       println(message)
-      if (success) pollAndReportStatus(driverId) else System.exit(-1)
+      if (success) {
+        activeMasterActor = context.actorSelection(sender.path)
+        pollAndReportStatus(driverId)
+      } else if (!Utils.responseFromBackup(message)) {
+        System.exit(-1)
+      }
 
     case DisassociatedEvent(_, remoteAddress, _) =>
-      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
-      System.exit(-1)
+      if (!lostMasters.contains(remoteAddress)) {
+        println(s"Error connecting to master $remoteAddress.")
+        lostMasters += remoteAddress
+        // Note that this heuristic does not account for the fact that a Master can recover within
+        // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
+        // is not currently a concern, however, because this client does not retry submissions.
+        if (lostMasters.size >= masterActors.size) {
+          println("No master is available, exiting.")
+          System.exit(-1)
+        }
+      }
 
     case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
-      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
-      println(s"Cause was: $cause")
-      System.exit(-1)
+      if (!lostMasters.contains(remoteAddress)) {
+        println(s"Error connecting to master ($remoteAddress).")
+        println(s"Cause was: $cause")
+        lostMasters += remoteAddress
+        if (lostMasters.size >= masterActors.size) {
+          println("No master is available, exiting.")
+          System.exit(-1)
+        }
+      }
   }
 }
 
@@ -163,7 +198,9 @@ object Client {
       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
 
     // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
-    Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
+    for (m <- driverArgs.masters) {
+      Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
+    }
     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 
     actorSystem.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 5cbac78..316e2d5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -22,8 +22,7 @@ import java.net.{URI, URISyntaxException}
 import scala.collection.mutable.ListBuffer
 
 import org.apache.log4j.Level
-
-import org.apache.spark.util.{IntParam, MemoryParam}
+import org.apache.spark.util.{IntParam, MemoryParam, Utils}
 
 /**
  * Command-line parser for the driver client.
@@ -35,7 +34,7 @@ private[deploy] class ClientArguments(args: Array[String]) {
   var logLevel = Level.WARN
 
   // launch parameters
-  var master: String = ""
+  var masters: Array[String] = null
   var jarUrl: String = ""
   var mainClass: String = ""
   var supervise: Boolean = DEFAULT_SUPERVISE
@@ -80,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) {
       }
 
       jarUrl = _jarUrl
-      master = _master
+      masters = Utils.parseStandaloneMasterUrls(_master)
       mainClass = _mainClass
       _driverOptions ++= tail
 
     case "kill" :: _master :: _driverId :: tail =>
       cmd = "kill"
-      master = _master
+      masters = Utils.parseStandaloneMasterUrls(_master)
       driverId = _driverId
 
     case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index af38bf8..42b5d41 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -118,8 +118,8 @@ object SparkSubmit {
    * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
    */
   private def kill(args: SparkSubmitArguments): Unit = {
-    new RestSubmissionClient()
-      .killSubmission(args.master, args.submissionToKill)
+    new RestSubmissionClient(args.master)
+      .killSubmission(args.submissionToKill)
   }
 
   /**
@@ -127,8 +127,8 @@ object SparkSubmit {
    * Standalone and Mesos cluster mode only.
    */
   private def requestStatus(args: SparkSubmitArguments): Unit = {
-    new RestSubmissionClient()
-      .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
+    new RestSubmissionClient(args.master)
+      .requestSubmissionStatus(args.submissionToRequestStatusFor)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dc6077f..0fac3cd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -254,7 +254,8 @@ private[master] class Master(
 
     case RequestSubmitDriver(description) => {
       if (state != RecoveryState.ALIVE) {
-        val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
+        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+          "Can only accept driver submissions in ALIVE state."
         sender ! SubmitDriverResponse(false, None, msg)
       } else {
         logInfo("Driver submitted " + description.command.mainClass)
@@ -274,7 +275,8 @@ private[master] class Master(
 
     case RequestKillDriver(driverId) => {
       if (state != RecoveryState.ALIVE) {
-        val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
+        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+          s"Can only kill drivers in ALIVE state."
         sender ! KillDriverResponse(driverId, success = false, msg)
       } else {
         logInfo("Asked to kill driver " + driverId)
@@ -305,12 +307,18 @@ private[master] class Master(
     }
 
     case RequestDriverStatus(driverId) => {
-      (drivers ++ completedDrivers).find(_.id == driverId) match {
-        case Some(driver) =>
-          sender ! DriverStatusResponse(found = true, Some(driver.state),
-            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
-        case None =>
-          sender ! DriverStatusResponse(found = false, None, None, None, None)
+      if (state != RecoveryState.ALIVE) {
+        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+          "Can only request driver status in ALIVE state."
+        sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))
+      } else {
+        (drivers ++ completedDrivers).find(_.id == driverId) match {
+          case Some(driver) =>
+            sender ! DriverStatusResponse(found = true, Some(driver.state),
+              driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
+          case None =>
+            sender ! DriverStatusResponse(found = false, None, None, None, None)
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 307cebf..6078f50 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.deploy.rest
 
 import java.io.{DataOutputStream, FileNotFoundException}
-import java.net.{HttpURLConnection, SocketException, URL}
+import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
 import javax.servlet.http.HttpServletResponse
 
+import scala.collection.mutable
 import scala.io.Source
 
 import com.fasterxml.jackson.core.JsonProcessingException
@@ -51,57 +52,109 @@ import org.apache.spark.util.Utils
  * implementation of this client can use that information to retry using the version specified
  * by the server.
  */
-private[spark] class RestSubmissionClient extends Logging {
+private[spark] class RestSubmissionClient(master: String) extends Logging {
   import RestSubmissionClient._
 
   private val supportedMasterPrefixes = Seq("spark://", "mesos://")
 
+  private val masters: Array[String] = Utils.parseStandaloneMasterUrls(master)
+
+  // Set of masters that lost contact with us, used to keep track of
+  // whether there are masters still alive for us to communicate with
+  private val lostMasters = new mutable.HashSet[String]
+
   /**
    * Submit an application specified by the parameters in the provided request.
    *
    * If the submission was successful, poll the status of the submission and report
    * it to the user. Otherwise, report the error message provided by the server.
    */
-  def createSubmission(
-      master: String,
-      request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
+  def createSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
     logInfo(s"Submitting a request to launch an application in $master.")
-    validateMaster(master)
-    val url = getSubmitUrl(master)
-    val response = postJson(url, request.toJson)
-    response match {
-      case s: CreateSubmissionResponse =>
-        reportSubmissionStatus(master, s)
-        handleRestResponse(s)
-      case unexpected =>
-        handleUnexpectedRestResponse(unexpected)
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = null
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getSubmitUrl(m)
+      try {
+        response = postJson(url, request.toJson)
+        response match {
+          case s: CreateSubmissionResponse =>
+            if (s.success) {
+              reportSubmissionStatus(s)
+              handleRestResponse(s)
+              handled = true
+            }
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to server", e)
+          }
+      }
     }
     response
   }
 
   /** Request that the server kill the specified submission. */
-  def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
+  def killSubmission(submissionId: String): SubmitRestProtocolResponse = {
     logInfo(s"Submitting a request to kill submission $submissionId in $master.")
-    validateMaster(master)
-    val response = post(getKillUrl(master, submissionId))
-    response match {
-      case k: KillSubmissionResponse => handleRestResponse(k)
-      case unexpected => handleUnexpectedRestResponse(unexpected)
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = null
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getKillUrl(m, submissionId)
+      try {
+        response = post(url)
+        response match {
+          case k: KillSubmissionResponse =>
+            if (!Utils.responseFromBackup(k.message)) {
+              handleRestResponse(k)
+              handled = true
+            }
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to server", e)
+          }
+      }
     }
     response
   }
 
   /** Request the status of a submission from the server. */
   def requestSubmissionStatus(
-      master: String,
       submissionId: String,
       quiet: Boolean = false): SubmitRestProtocolResponse = {
     logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
-    validateMaster(master)
-    val response = get(getStatusUrl(master, submissionId))
-    response match {
-      case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
-      case unexpected => handleUnexpectedRestResponse(unexpected)
+
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = null
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getStatusUrl(m, submissionId)
+      try {
+        response = get(url)
+        response match {
+          case s: SubmissionStatusResponse if s.success =>
+            if (!quiet) {
+              handleRestResponse(s)
+            }
+            handled = true
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to server", e)
+          }
+      }
     }
     response
   }
@@ -148,11 +201,16 @@ private[spark] class RestSubmissionClient extends Logging {
     conn.setRequestProperty("Content-Type", "application/json")
     conn.setRequestProperty("charset", "utf-8")
     conn.setDoOutput(true)
-    val out = new DataOutputStream(conn.getOutputStream)
-    Utils.tryWithSafeFinally {
-      out.write(json.getBytes(Charsets.UTF_8))
-    } {
-      out.close()
+    try {
+      val out = new DataOutputStream(conn.getOutputStream)
+      Utils.tryWithSafeFinally {
+        out.write(json.getBytes(Charsets.UTF_8))
+      } {
+        out.close()
+      }
+    } catch {
+      case e: ConnectException =>
+        throw new SubmitRestConnectionException("Connect Exception when connect to server", e)
     }
     readResponse(conn)
   }
@@ -191,11 +249,9 @@ private[spark] class RestSubmissionClient extends Logging {
       }
     } catch {
       case unreachable @ (_: FileNotFoundException | _: SocketException) =>
-        throw new SubmitRestConnectionException(
-          s"Unable to connect to server ${connection.getURL}", unreachable)
+        throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
       case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
-        throw new SubmitRestProtocolException(
-          "Malformed response received from server", malformed)
+        throw new SubmitRestProtocolException("Malformed response received from server", malformed)
     }
   }
 
@@ -241,13 +297,12 @@ private[spark] class RestSubmissionClient extends Logging {
 
   /** Report the status of a newly created submission. */
   private def reportSubmissionStatus(
-      master: String,
       submitResponse: CreateSubmissionResponse): Unit = {
     if (submitResponse.success) {
       val submissionId = submitResponse.submissionId
       if (submissionId != null) {
         logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
-        pollSubmissionStatus(master, submissionId)
+        pollSubmissionStatus(submissionId)
       } else {
         // should never happen
         logError("Application successfully submitted, but submission ID was not provided!")
@@ -262,9 +317,9 @@ private[spark] class RestSubmissionClient extends Logging {
    * Poll the status of the specified submission and log it.
    * This retries up to a fixed number of times before giving up.
    */
-  private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
+  private def pollSubmissionStatus(submissionId: String): Unit = {
     (1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
-      val response = requestSubmissionStatus(master, submissionId, quiet = true)
+      val response = requestSubmissionStatus(submissionId, quiet = true)
       val statusResponse = response match {
         case s: SubmissionStatusResponse => s
         case _ => return // unexpected type, let upstream caller handle it
@@ -302,6 +357,21 @@ private[spark] class RestSubmissionClient extends Logging {
   private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
     logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
   }
+
+  /**
+   * When a connection exception is caught, return true if all masters are lost.
+   * Note that the heuristic used here does not take into account that masters
+   * can recover during the lifetime of this client. This assumption should be
+   * harmless because this client currently does not support retrying submission
+   * on failure yet (SPARK-6443).
+   */
+  private def handleConnectionException(masterUrl: String): Boolean = {
+    if (!lostMasters.contains(masterUrl)) {
+      logWarning(s"Unable to connect to server ${masterUrl}.")
+      lostMasters += masterUrl
+    }
+    lostMasters.size >= masters.size
+  }
 }
 
 private[spark] object RestSubmissionClient {
@@ -324,10 +394,10 @@ private[spark] object RestSubmissionClient {
     }
     val sparkProperties = conf.getAll.toMap
     val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
-    val client = new RestSubmissionClient
+    val client = new RestSubmissionClient(master)
     val submitRequest = client.constructSubmitRequest(
       appResource, mainClass, appArgs, sparkProperties, environmentVariables)
-    client.createSubmission(master, submitRequest)
+    client.createSubmission(submitRequest)
   }
 
   def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 88f9d88..9678631 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
       if (masters != null) {  // Two positional arguments were given
         printUsageAndExit(1)
       }
-      masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
+      masters = Utils.parseStandaloneMasterUrls(value)
       parse(tail)
 
     case Nil =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 844f0cd..be4db02 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2160,6 +2160,22 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Split the comma delimited string of master URLs into a list.
+   * For instance, "spark://abc,def" becomes [spark://abc, spark://def].
+   */
+  def parseStandaloneMasterUrls(masterUrls: String): Array[String] = {
+    masterUrls.stripPrefix("spark://").split(",").map("spark://" + _)
+  }
+
+  /** An identifier that backup masters use in their responses. */
+  val BACKUP_STANDALONE_MASTER_PREFIX = "Current state is not alive"
+
+  /** Return true if the response message is sent from a backup Master on standby. */
+  def responseFromBackup(msg: String): Boolean = {
+    msg.startsWith(BACKUP_STANDALONE_MASTER_PREFIX)
+  }
+
+  /**
    * Adds a shutdown hook with default priority.
    *
    * @param hook The code to run during shutdown.

http://git-wip-us.apache.org/repos/asf/spark/blob/b4b43df8/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 0a318a2..f4d548d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -39,7 +39,6 @@ import org.apache.spark.deploy.master.DriverState._
  * Tests for the REST application submission protocol used in standalone cluster mode.
  */
 class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
-  private val client = new RestSubmissionClient
   private var actorSystem: Option[ActorSystem] = None
   private var server: Option[RestSubmissionServer] = None
 
@@ -52,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val appArgs = Array("one", "two", "three")
     val sparkProperties = Map("spark.app.name" -> "pi")
     val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX")
-    val request = client.constructSubmitRequest(
+    val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest(
       "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables)
     assert(request.action === Utils.getFormattedClassName(request))
     assert(request.clientSparkVersion === SPARK_VERSION)
@@ -71,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val request = constructSubmitRequest(masterUrl, appArgs)
     assert(request.appArgs === appArgs)
     assert(request.sparkProperties("spark.master") === masterUrl)
-    val response = client.createSubmission(masterUrl, request)
+    val response = new RestSubmissionClient(masterUrl).createSubmission(request)
     val submitResponse = getSubmitResponse(response)
     assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
     assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@@ -102,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val submissionId = "my-lyft-driver"
     val killMessage = "your driver is killed"
     val masterUrl = startDummyServer(killMessage = killMessage)
-    val response = client.killSubmission(masterUrl, submissionId)
+    val response = new RestSubmissionClient(masterUrl).killSubmission(submissionId)
     val killResponse = getKillResponse(response)
     assert(killResponse.action === Utils.getFormattedClassName(killResponse))
     assert(killResponse.serverSparkVersion === SPARK_VERSION)
@@ -116,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val submissionState = KILLED
     val submissionException = new Exception("there was an irresponsible mix of alcohol and cars")
     val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException))
-    val response = client.requestSubmissionStatus(masterUrl, submissionId)
+    val response = new RestSubmissionClient(masterUrl).requestSubmissionStatus(submissionId)
     val statusResponse = getStatusResponse(response)
     assert(statusResponse.action === Utils.getFormattedClassName(statusResponse))
     assert(statusResponse.serverSparkVersion === SPARK_VERSION)
@@ -129,13 +128,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("create then kill") {
     val masterUrl = startSmartServer()
     val request = constructSubmitRequest(masterUrl)
-    val response1 = client.createSubmission(masterUrl, request)
+    val client = new RestSubmissionClient(masterUrl)
+    val response1 = client.createSubmission(request)
     val submitResponse = getSubmitResponse(response1)
     assert(submitResponse.success)
     assert(submitResponse.submissionId != null)
     // kill submission that was just created
     val submissionId = submitResponse.submissionId
-    val response2 = client.killSubmission(masterUrl, submissionId)
+    val response2 = client.killSubmission(submissionId)
     val killResponse = getKillResponse(response2)
     assert(killResponse.success)
     assert(killResponse.submissionId === submissionId)
@@ -144,13 +144,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("create then request status") {
     val masterUrl = startSmartServer()
     val request = constructSubmitRequest(masterUrl)
-    val response1 = client.createSubmission(masterUrl, request)
+    val client = new RestSubmissionClient(masterUrl)
+    val response1 = client.createSubmission(request)
     val submitResponse = getSubmitResponse(response1)
     assert(submitResponse.success)
     assert(submitResponse.submissionId != null)
     // request status of submission that was just created
     val submissionId = submitResponse.submissionId
-    val response2 = client.requestSubmissionStatus(masterUrl, submissionId)
+    val response2 = client.requestSubmissionStatus(submissionId)
     val statusResponse = getStatusResponse(response2)
     assert(statusResponse.success)
     assert(statusResponse.submissionId === submissionId)
@@ -160,8 +161,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("create then kill then request status") {
     val masterUrl = startSmartServer()
     val request = constructSubmitRequest(masterUrl)
-    val response1 = client.createSubmission(masterUrl, request)
-    val response2 = client.createSubmission(masterUrl, request)
+    val client = new RestSubmissionClient(masterUrl)
+    val response1 = client.createSubmission(request)
+    val response2 = client.createSubmission(request)
     val submitResponse1 = getSubmitResponse(response1)
     val submitResponse2 = getSubmitResponse(response2)
     assert(submitResponse1.success)
@@ -171,13 +173,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val submissionId1 = submitResponse1.submissionId
     val submissionId2 = submitResponse2.submissionId
     // kill only submission 1, but not submission 2
-    val response3 = client.killSubmission(masterUrl, submissionId1)
+    val response3 = client.killSubmission(submissionId1)
     val killResponse = getKillResponse(response3)
     assert(killResponse.success)
     assert(killResponse.submissionId === submissionId1)
     // request status for both submissions: 1 should be KILLED but 2 should be RUNNING still
-    val response4 = client.requestSubmissionStatus(masterUrl, submissionId1)
-    val response5 = client.requestSubmissionStatus(masterUrl, submissionId2)
+    val response4 = client.requestSubmissionStatus(submissionId1)
+    val response5 = client.requestSubmissionStatus(submissionId2)
     val statusResponse1 = getStatusResponse(response4)
     val statusResponse2 = getStatusResponse(response5)
     assert(statusResponse1.submissionId === submissionId1)
@@ -189,13 +191,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("kill or request status before create") {
     val masterUrl = startSmartServer()
     val doesNotExist = "does-not-exist"
+    val client = new RestSubmissionClient(masterUrl)
     // kill a non-existent submission
-    val response1 = client.killSubmission(masterUrl, doesNotExist)
+    val response1 = client.killSubmission(doesNotExist)
     val killResponse = getKillResponse(response1)
     assert(!killResponse.success)
     assert(killResponse.submissionId === doesNotExist)
     // request status for a non-existent submission
-    val response2 = client.requestSubmissionStatus(masterUrl, doesNotExist)
+    val response2 = client.requestSubmissionStatus(doesNotExist)
     val statusResponse = getStatusResponse(response2)
     assert(!statusResponse.success)
     assert(statusResponse.submissionId === doesNotExist)
@@ -339,6 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
 
   test("client handles faulty server") {
     val masterUrl = startFaultyServer()
+    val client = new RestSubmissionClient(masterUrl)
     val httpUrl = masterUrl.replace("spark://", "http://")
     val v = RestSubmissionServer.PROTOCOL_VERSION
     val submitRequestPath = s"$httpUrl/$v/submissions/create"
@@ -425,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
       mainJar) ++ appArgs
     val args = new SparkSubmitArguments(commandLineArgs)
     val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
-    client.constructSubmitRequest(
+    new RestSubmissionClient("spark://host:port").constructSubmitRequest(
       mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty)
   }
 
@@ -492,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
       method: String,
       body: String = ""): (SubmitRestProtocolResponse, Int) = {
     val conn = sendHttpRequest(url, method, body)
-    (client.readResponse(conn), conn.getResponseCode)
+    (new RestSubmissionClient("spark://host:port").readResponse(conn), conn.getResponseCode)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org