You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/01/08 08:01:45 UTC

spark git commit: [SPARK-5126][Core] Verify Spark urls before creating Actors so that invalid urls can crash the process.

Repository: spark
Updated Branches:
  refs/heads/master d345ebebd -> 2b729d225


[SPARK-5126][Core] Verify Spark urls before creating Actors so that invalid urls can crash the process.

Because `actorSelection` will return `deadLetters` for an invalid path,  Worker keeps quiet for an invalid master url. It's better to log an error so that people can find such problem quickly.

This PR will check the url before sending to `actorSelection`, throw and log a SparkException for an invalid url.

Author: zsxwing <zs...@gmail.com>

Closes #3927 from zsxwing/SPARK-5126 and squashes the following commits:

9d429ee [zsxwing] Create a utility method in Utils to parse Spark url; verify urls before creating Actors so that invalid urls can crash the process.
8286e51 [zsxwing] Check the url before sending to Akka and log the error if the url is invalid


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b729d22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b729d22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b729d22

Branch: refs/heads/master
Commit: 2b729d22500c682435ef7adde566551b45a3c6e3
Parents: d345ebe
Author: zsxwing <zs...@gmail.com>
Authored: Wed Jan 7 23:01:30 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 7 23:01:30 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  |  2 +
 .../apache/spark/deploy/client/AppClient.scala  | 22 ++++-----
 .../org/apache/spark/deploy/master/Master.scala | 25 ++++++----
 .../org/apache/spark/deploy/worker/Worker.scala | 21 ++++----
 .../scala/org/apache/spark/util/Utils.scala     | 29 ++++++++++++
 .../spark/deploy/master/MasterSuite.scala       | 50 ++++++++++++++++++++
 6 files changed, 116 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b729d22/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index f2687ce..7c1c831 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -160,6 +160,8 @@ object Client {
     val (actorSystem, _) = AkkaUtils.createActorSystem(
       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
 
+    // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
+    Master.toAkkaUrl(driverArgs.master)
     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 
     actorSystem.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/2b729d22/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 4efebca..39a7b03 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -26,7 +26,7 @@ import akka.actor._
 import akka.pattern.ask
 import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
 
-import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
@@ -47,6 +47,8 @@ private[spark] class AppClient(
     conf: SparkConf)
   extends Logging {
 
+  val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
 
@@ -75,9 +77,9 @@ private[spark] class AppClient(
     }
 
     def tryRegisterAllMasters() {
-      for (masterUrl <- masterUrls) {
-        logInfo("Connecting to master " + masterUrl + "...")
-        val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+      for (masterAkkaUrl <- masterAkkaUrls) {
+        logInfo("Connecting to master " + masterAkkaUrl + "...")
+        val actor = context.actorSelection(masterAkkaUrl)
         actor ! RegisterApplication(appDescription)
       }
     }
@@ -103,20 +105,14 @@ private[spark] class AppClient(
     }
 
     def changeMaster(url: String) {
+      // activeMasterUrl is a valid Spark url since we receive it from master.
       activeMasterUrl = url
       master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
-      masterAddress = activeMasterUrl match {
-        case Master.sparkUrlRegex(host, port) =>
-          Address("akka.tcp", Master.systemName, host, port.toInt)
-        case x =>
-          throw new SparkException("Invalid spark URL: " + x)
-      }
+      masterAddress = Master.toAkkaAddress(activeMasterUrl)
     }
 
     private def isPossibleMaster(remoteUrl: Address) = {
-      masterUrls.map(s => Master.toAkkaUrl(s))
-        .map(u => AddressFromURIString(u).hostPort)
-        .contains(remoteUrl.hostPort)
+      masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort)
     }
 
     override def receiveWithLogging = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b729d22/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e8a5cfc..b1c0152 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -845,7 +845,6 @@ private[spark] class Master(
 private[spark] object Master extends Logging {
   val systemName = "sparkMaster"
   private val actorName = "Master"
-  val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
 
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
@@ -855,14 +854,24 @@ private[spark] object Master extends Logging {
     actorSystem.awaitTermination()
   }
 
-  /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+  /**
+   * Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`.
+   *
+   * @throws SparkException if the url is invalid
+   */
   def toAkkaUrl(sparkUrl: String): String = {
-    sparkUrl match {
-      case sparkUrlRegex(host, port) =>
-        "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
-      case _ =>
-        throw new SparkException("Invalid master URL: " + sparkUrl)
-    }
+    val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
+    "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+  }
+
+  /**
+   * Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
+   *
+   * @throws SparkException if the url is invalid
+   */
+  def toAkkaAddress(sparkUrl: String): Address = {
+    val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
+    Address("akka.tcp", systemName, host, port)
   }
 
   def startSystemAndActor(

http://git-wip-us.apache.org/repos/asf/spark/blob/2b729d22/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index f0f3da5..1359983 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
 
 /**
-  * @param masterUrls Each url should look like spark://host:port.
+  * @param masterAkkaUrls Each url should be a valid akka url.
   */
 private[spark] class Worker(
     host: String,
@@ -48,7 +48,7 @@ private[spark] class Worker(
     webUiPort: Int,
     cores: Int,
     memory: Int,
-    masterUrls: Array[String],
+    masterAkkaUrls: Array[String],
     actorSystemName: String,
     actorName: String,
     workDirPath: String = null,
@@ -171,15 +171,11 @@ private[spark] class Worker(
   }
 
   def changeMaster(url: String, uiUrl: String) {
+    // activeMasterUrl it's a valid Spark url since we receive it from master.
     activeMasterUrl = url
     activeMasterWebUiUrl = uiUrl
     master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
-    masterAddress = activeMasterUrl match {
-      case Master.sparkUrlRegex(_host, _port) =>
-        Address("akka.tcp", Master.systemName, _host, _port.toInt)
-      case x =>
-        throw new SparkException("Invalid spark URL: " + x)
-    }
+    masterAddress = Master.toAkkaAddress(activeMasterUrl)
     connected = true
     // Cancel any outstanding re-registration attempts because we found a new master
     registrationRetryTimer.foreach(_.cancel())
@@ -187,9 +183,9 @@ private[spark] class Worker(
   }
 
   private def tryRegisterAllMasters() {
-    for (masterUrl <- masterUrls) {
-      logInfo("Connecting to master " + masterUrl + "...")
-      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+    for (masterAkkaUrl <- masterAkkaUrls) {
+      logInfo("Connecting to master " + masterAkkaUrl + "...")
+      val actor = context.actorSelection(masterAkkaUrl)
       actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
     }
   }
@@ -527,8 +523,9 @@ private[spark] object Worker extends Logging {
     val securityMgr = new SecurityManager(conf)
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
       conf = conf, securityManager = securityMgr)
+    val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
     actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
-      masterUrls, systemName, actorName,  workDir, conf, securityMgr), name = actorName)
+      masterAkkaUrls, systemName, actorName,  workDir, conf, securityMgr), name = actorName)
     (actorSystem, boundPort)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b729d22/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0d771ba..9d6b616 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1842,6 +1842,35 @@ private[spark] object Utils extends Logging {
       sparkValue
     }
   }
+
+  /**
+   * Return a pair of host and port extracted from the `sparkUrl`.
+   *
+   * A spark url (`spark://host:port`) is a special URI that its scheme is `spark` and only contains
+   * host and port.
+   *
+   * @throws SparkException if `sparkUrl` is invalid.
+   */
+  def extractHostPortFromSparkUrl(sparkUrl: String): (String, Int) = {
+    try {
+      val uri = new java.net.URI(sparkUrl)
+      val host = uri.getHost
+      val port = uri.getPort
+      if (uri.getScheme != "spark" ||
+        host == null ||
+        port < 0 ||
+        (uri.getPath != null && !uri.getPath.isEmpty) || // uri.getPath returns "" instead of null
+        uri.getFragment != null ||
+        uri.getQuery != null ||
+        uri.getUserInfo != null) {
+        throw new SparkException("Invalid master URL: " + sparkUrl)
+      }
+      (host, port)
+    } catch {
+      case e: java.net.URISyntaxException =>
+        throw new SparkException("Invalid master URL: " + sparkUrl, e)
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2b729d22/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
new file mode 100644
index 0000000..3d2335f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import akka.actor.Address
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkException
+
+class MasterSuite extends FunSuite {
+
+  test("toAkkaUrl") {
+    val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234")
+    assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
+  }
+
+  test("toAkkaUrl: a typo url") {
+    val e = intercept[SparkException] {
+      Master.toAkkaUrl("spark://1.2. 3.4:1234")
+    }
+    assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
+  }
+
+  test("toAkkaAddress") {
+    val address = Master.toAkkaAddress("spark://1.2.3.4:1234")
+    assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
+  }
+
+  test("toAkkaAddress: a typo url") {
+    val e = intercept[SparkException] {
+      Master.toAkkaAddress("spark://1.2. 3.4:1234")
+    }
+    assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org