You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/13 18:29:31 UTC

spark git commit: [SPARK-5006][Deploy]spark.port.maxRetries doesn't work

Repository: spark
Updated Branches:
  refs/heads/master 1e42e96ec -> f7741a9a7


[SPARK-5006][Deploy]spark.port.maxRetries doesn't work

https://issues.apache.org/jira/browse/SPARK-5006

I think the issue is produced in https://github.com/apache/spark/pull/1777.

Not digging mesos's backend yet. Maybe should add same logic either.

Author: WangTaoTheTonic <ba...@aliyun.com>
Author: WangTao <ba...@aliyun.com>

Closes #3841 from WangTaoTheTonic/SPARK-5006 and squashes the following commits:

8cdf96d [WangTao] indent thing
2d86d65 [WangTaoTheTonic] fix line length
7cdfd98 [WangTaoTheTonic] fit for new HttpServer constructor
61a370d [WangTaoTheTonic] some minor fixes
bc6e1ec [WangTaoTheTonic] rebase
67bcb46 [WangTaoTheTonic] put conf at 3rd position, modify suite class, add comments
f450cd1 [WangTaoTheTonic] startServiceOnPort will use a SparkConf arg
29b751b [WangTaoTheTonic] rebase as ExecutorRunnableUtil changed to ExecutorRunnable
396c226 [WangTaoTheTonic] make the grammar more like scala
191face [WangTaoTheTonic] invalid value name
62ec336 [WangTaoTheTonic] spark.port.maxRetries doesn't work


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7741a9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7741a9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7741a9a

Branch: refs/heads/master
Commit: f7741a9a72fef23c46f0ad9e1bd16b150967d816
Parents: 1e42e96
Author: WangTaoTheTonic <ba...@aliyun.com>
Authored: Tue Jan 13 09:28:21 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Jan 13 09:29:25 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpFileServer.scala |  3 ++-
 .../scala/org/apache/spark/HttpServer.scala     |  3 ++-
 .../main/scala/org/apache/spark/SparkConf.scala |  6 +++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  2 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  3 ++-
 .../spark/network/nio/ConnectionManager.scala   |  2 +-
 .../scala/org/apache/spark/ui/JettyUtils.scala  |  2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 23 ++++++++++----------
 .../streaming/flume/FlumeStreamSuite.scala      |  2 +-
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |  3 ++-
 .../org/apache/spark/repl/SparkIMain.scala      |  2 +-
 .../main/scala/org/apache/spark/repl/Main.scala |  2 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  9 +++-----
 14 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index edc3889..677c5e0 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -24,6 +24,7 @@ import com.google.common.io.Files
 import org.apache.spark.util.Utils
 
 private[spark] class HttpFileServer(
+    conf: SparkConf,
     securityManager: SecurityManager,
     requestedPort: Int = 0)
   extends Logging {
@@ -41,7 +42,7 @@ private[spark] class HttpFileServer(
     fileDir.mkdir()
     jarDir.mkdir()
     logInfo("HTTP File server directory is " + baseDir)
-    httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
+    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
     httpServer.start()
     serverUri = httpServer.uri
     logDebug("HTTP file server started at: " + serverUri)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 912558d..fa22787 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
  * around a Jetty server.
  */
 private[spark] class HttpServer(
+    conf: SparkConf,
     resourceBase: File,
     securityManager: SecurityManager,
     requestedPort: Int = 0,
@@ -57,7 +58,7 @@ private[spark] class HttpServer(
     } else {
       logInfo("Starting HTTP Server")
       val (actualServer, actualPort) =
-        Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
+        Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
       server = actualServer
       port = actualPort
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index c14764f..a0ce107 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -370,7 +370,9 @@ private[spark] object SparkConf {
   }
 
   /**
-   * Return whether the given config is a Spark port config.
+   * Return true if the given config matches either `spark.*.port` or `spark.port.*`.
    */
-  def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
+  def isSparkPortConf(name: String): Boolean = {
+    (name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 43436a1..4d41803 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -312,7 +312,7 @@ object SparkEnv extends Logging {
     val httpFileServer =
       if (isDriver) {
         val fileServerPort = conf.getInt("spark.fileserver.port", 0)
-        val server = new HttpFileServer(securityManager, fileServerPort)
+        val server = new HttpFileServer(conf, securityManager, fileServerPort)
         server.initialize()
         conf.set("spark.fileserver.uri",  server.serverUri)
         server

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 31f0a46..31d6958 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging {
   private def createServer(conf: SparkConf) {
     broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
     val broadcastPort = conf.getInt("spark.broadcast.port", 0)
-    server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
+    server =
+      new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
     server.start()
     serverUri = server.uri
     logInfo("Broadcast server started at " + serverUri)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 3340fca..03c4137 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -174,7 +174,7 @@ private[nio] class ConnectionManager(
     serverChannel.socket.bind(new InetSocketAddress(port))
     (serverChannel, serverChannel.socket.getLocalPort)
   }
-  Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
+  Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
   serverChannel.register(selector, SelectionKey.OP_ACCEPT)
 
   val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 2a27d49..88fed83 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
       }
     }
 
-    val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
+    val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
     ServerInfo(server, boundPort, collection)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index db2531d..4c9b1e3 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
     val startService: Int => (ActorSystem, Int) = { actualPort =>
       doCreateActorSystem(name, host, actualPort, conf, securityManager)
     }
-    Utils.startServiceOnPort(port, startService, name)
+    Utils.startServiceOnPort(port, startService, conf, name)
   }
 
   private def doCreateActorSystem(

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 578f1a9..2c04e4d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1690,17 +1690,15 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Default maximum number of retries when binding to a port before giving up.
+   * Maximum number of retries when binding to a port before giving up.
    */
-  val portMaxRetries: Int = {
-    if (sys.props.contains("spark.testing")) {
+  def portMaxRetries(conf: SparkConf): Int = {
+    val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
+    if (conf.contains("spark.testing")) {
       // Set a higher number of retries for tests...
-      sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
+      maxRetries.getOrElse(100)
     } else {
-      Option(SparkEnv.get)
-        .flatMap(_.conf.getOption("spark.port.maxRetries"))
-        .map(_.toInt)
-        .getOrElse(16)
+      maxRetries.getOrElse(16)
     }
   }
 
@@ -1709,17 +1707,18 @@ private[spark] object Utils extends Logging {
    * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
    *
    * @param startPort The initial port to start the service on.
-   * @param maxRetries Maximum number of retries to attempt.
-   *                   A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
    * @param startService Function to start service on a given port.
    *                     This is expected to throw java.net.BindException on port collision.
+   * @param conf A SparkConf used to get the maximum number of retries when binding to a port.
+   * @param serviceName Name of the service.
    */
   def startServiceOnPort[T](
       startPort: Int,
       startService: Int => (T, Int),
-      serviceName: String = "",
-      maxRetries: Int = portMaxRetries): (T, Int) = {
+      conf: SparkConf,
+      serviceName: String = ""): (T, Int) = {
     val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+    val maxRetries = portMaxRetries(conf)
     for (offset <- 0 to maxRetries) {
       // Do not increment port if startPort is 0, which is treated as a special port
       val tryPort = if (startPort == 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 13943ed..f333e38 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
       val socket = new ServerSocket(trialPort)
       socket.close()
       (null, trialPort)
-    })._2
+    }, conf)._2
   }
 
   /** Setup and start the streaming context */

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 30727df..fe53a29 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.concurrent.Eventually
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.SparkConf
 import org.apache.spark.util.Utils
 
 class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
@@ -106,7 +107,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
       val socket = new ServerSocket(trialPort)
       socket.close()
       (null, trialPort)
-    })._2
+    }, new SparkConf())._2
   }
 
   def publishData(data: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 646c68e..b646f0b 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -106,7 +106,7 @@ import org.apache.spark.util.Utils
     val virtualDirectory                              = new PlainFile(outputDir) // "directory" for classfiles
     /** Jetty server that will serve our classes to worker nodes */
     val classServerPort                               = conf.getInt("spark.replClassServer.port", 0)
-    val classServer                                   = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
+    val classServer                                   = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
     private var currentSettings: Settings             = initialSettings
     var printResults                                  = true      // whether to print result lines
     var totalSilence                                  = false     // whether to print anything

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 5e93a71..69e44d4 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -32,7 +32,7 @@ object Main extends Logging {
   val s = new Settings()
   s.processArguments(List("-Yrepl-class-based",
     "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-Yrepl-sync"), true)
-  val classServer = new HttpServer(outputDir, new SecurityManager(conf))
+  val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
   var sparkContext: SparkContext = _
   var interp = new SparkILoop // this is a public var because tests reset it.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f7741a9a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ebf5616..c537da9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -148,12 +148,9 @@ class ExecutorRunnable(
     // registers with the Scheduler and transfers the spark configs. Since the Executor backend
     // uses Akka to connect to the scheduler, the akka settings are needed as well as the
     // authentication settings.
-    sparkConf.getAll.
-      filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
-      foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
-
-    sparkConf.getAkkaConf.
-      foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+    sparkConf.getAll
+      .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
+      .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
 
     // Commenting it out for now - so that people can refer to the properties if required. Remove
     // it once cpuset version is pushed out.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org