You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/02/03 02:28:12 UTC

[2/2] spark git commit: Spark 3883: SSL support for HttpServer and Akka

Spark 3883: SSL support for HttpServer and Akka

SPARK-3883: SSL support for Akka connections and Jetty based file servers.

This story introduced the following changes:
- Introduced SSLOptions object which holds the SSL configuration and can build the appropriate configuration for Akka or Jetty. SSLOptions can be created by parsing SparkConf entries at a specified namespace.
- SSLOptions is created and kept by SecurityManager
- All Akka actor address creation snippets based on interpolated strings were replaced by a dedicated methods from AkkaUtils. Those methods select the proper Akka protocol - whether akka.tcp or akka.ssl.tcp
- Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager
- Added a way to use node local SSL configuration by executors and driver in standalone mode. It can be done by specifying spark.ssl.useNodeLocalConf in SparkConf.
- Made CoarseGrainedExecutorBackend not overwrite the settings which are executor startup configuration - they are passed anyway from Worker

Refer to https://github.com/apache/spark/pull/3571 for discussion and details

Author: Jacek Lewandowski <le...@gmail.com>
Author: Jacek Lewandowski <ja...@datastax.com>

Closes #3571 from jacek-lewandowski/SPARK-3883-master and squashes the following commits:

9ef4ed1 [Jacek Lewandowski] Merge pull request #2 from jacek-lewandowski/SPARK-3883-docs2
fb31b49 [Jacek Lewandowski] SPARK-3883: Added SSL setup documentation
2532668 [Jacek Lewandowski] SPARK-3883: Refactored AkkaUtils.protocol method to not use Try
90a8762 [Jacek Lewandowski] SPARK-3883: Refactored methods to resolve Akka address and made it possible to easily configure multiple communication layers for SSL
72b2541 [Jacek Lewandowski] SPARK-3883: A reference to the fallback SSLOptions can be provided when constructing SSLOptions
93050f4 [Jacek Lewandowski] SPARK-3883: SSL support for HttpServer and Akka


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfea3003
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfea3003
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfea3003

Branch: refs/heads/master
Commit: cfea30037ff4ac7e386a1478e7dce07ca3bb9072
Parents: ef65cf0
Author: Jacek Lewandowski <le...@gmail.com>
Authored: Mon Feb 2 17:18:54 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Feb 2 17:27:26 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpServer.scala     |  11 +-
 .../scala/org/apache/spark/SSLOptions.scala     | 178 +++++++++++++++++
 .../org/apache/spark/SecurityManager.scala      | 100 +++++++++-
 .../main/scala/org/apache/spark/SparkConf.scala |   1 +
 .../apache/spark/broadcast/HttpBroadcast.scala  |   1 +
 .../spark/deploy/ApplicationDescription.scala   |   9 +
 .../scala/org/apache/spark/deploy/Client.scala  |   5 +-
 .../apache/spark/deploy/DriverDescription.scala |   8 +
 .../apache/spark/deploy/client/AppClient.scala  |   7 +-
 .../org/apache/spark/deploy/master/Master.scala |   8 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |   2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  67 +++++--
 .../executor/CoarseGrainedExecutorBackend.scala |  16 +-
 .../cluster/SimrSchedulerBackend.scala          |   4 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   5 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   5 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  36 +++-
 .../scala/org/apache/spark/util/Utils.scala     |  20 +-
 core/src/test/resources/keystore                | Bin 0 -> 2247 bytes
 core/src/test/resources/truststore              | Bin 0 -> 957 bytes
 core/src/test/resources/untrusted-keystore      | Bin 0 -> 2246 bytes
 .../org/apache/spark/FileServerSuite.scala      |  90 +++++++++
 .../apache/spark/MapOutputTrackerSuite.scala    |   2 +-
 .../org/apache/spark/SSLOptionsSuite.scala      | 123 ++++++++++++
 .../org/apache/spark/SSLSampleConfigs.scala     |  55 ++++++
 .../org/apache/spark/SecurityManagerSuite.scala |  50 ++++-
 .../org/apache/spark/deploy/ClientSuite.scala   |   1 +
 .../spark/deploy/master/MasterSuite.scala       |  26 ++-
 .../spark/deploy/worker/WorkerSuite.scala       |  57 ++++++
 .../org/apache/spark/util/AkkaUtilsSuite.scala  | 197 +++++++++++++++++--
 docs/configuration.md                           |  80 ++++++++
 docs/security.md                                |  24 +++
 .../apache/spark/repl/ExecutorClassLoader.scala |  11 +-
 .../receiver/ReceiverSupervisorImpl.scala       |   8 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   4 +-
 .../spark/deploy/yarn/YarnAllocator.scala       |   7 +-
 36 files changed, 1145 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index fa22787..09a9ccc 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.io.File
 
+import org.eclipse.jetty.server.ssl.SslSocketConnector
 import org.eclipse.jetty.util.security.{Constraint, Password}
 import org.eclipse.jetty.security.authentication.DigestAuthenticator
 import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
@@ -72,7 +73,10 @@ private[spark] class HttpServer(
    */
   private def doStart(startPort: Int): (Server, Int) = {
     val server = new Server()
-    val connector = new SocketConnector
+
+    val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
+      .map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
+
     connector.setMaxIdleTime(60 * 1000)
     connector.setSoLingerTime(-1)
     connector.setPort(startPort)
@@ -149,13 +153,14 @@ private[spark] class HttpServer(
   }
 
   /**
-   * Get the URI of this HTTP server (http://host:port)
+   * Get the URI of this HTTP server (http://host:port or https://host:port)
    */
   def uri: String = {
     if (server == null) {
       throw new ServerStateException("Server is not started")
     } else {
-      "http://" + Utils.localIpAddress + ":" + port
+      val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
+      s"$scheme://${Utils.localIpAddress}:$port"
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/SSLOptions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
new file mode 100644
index 0000000..2cdc167
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.eclipse.jetty.util.ssl.SslContextFactory
+
+/**
+ * SSLOptions class is a common container for SSL configuration options. It offers methods to
+ * generate specific objects to configure SSL for different communication protocols.
+ *
+ * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
+ * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
+ * authentication with SSL, SSLOptions cannot support it either.
+ *
+ * @param enabled             enables or disables SSL; if it is set to false, the rest of the
+ *                            settings are disregarded
+ * @param keyStore            a path to the key-store file
+ * @param keyStorePassword    a password to access the key-store file
+ * @param keyPassword         a password to access the private key in the key-store
+ * @param trustStore          a path to the trust-store file
+ * @param trustStorePassword  a password to access the trust-store file
+ * @param protocol            SSL protocol (remember that SSLv3 was compromised) supported by Java
+ * @param enabledAlgorithms   a set of encryption algorithms to use
+ */
+private[spark] case class SSLOptions(
+    enabled: Boolean = false,
+    keyStore: Option[File] = None,
+    keyStorePassword: Option[String] = None,
+    keyPassword: Option[String] = None,
+    trustStore: Option[File] = None,
+    trustStorePassword: Option[String] = None,
+    protocol: Option[String] = None,
+    enabledAlgorithms: Set[String] = Set.empty) {
+
+  /**
+   * Creates a Jetty SSL context factory according to the SSL settings represented by this object.
+   */
+  def createJettySslContextFactory(): Option[SslContextFactory] = {
+    if (enabled) {
+      val sslContextFactory = new SslContextFactory()
+
+      keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
+      trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
+      keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
+      trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
+      keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
+      protocol.foreach(sslContextFactory.setProtocol)
+      sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
+
+      Some(sslContextFactory)
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Creates an Akka configuration object which contains all the SSL settings represented by this
+   * object. It can be used then to compose the ultimate Akka configuration.
+   */
+  def createAkkaConfig: Option[Config] = {
+    import scala.collection.JavaConversions._
+    if (enabled) {
+      Some(ConfigFactory.empty()
+        .withValue("akka.remote.netty.tcp.security.key-store",
+          ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.key-store-password",
+          ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.trust-store",
+          ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.trust-store-password",
+          ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.key-password",
+          ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.random-number-generator",
+          ConfigValueFactory.fromAnyRef(""))
+        .withValue("akka.remote.netty.tcp.security.protocol",
+          ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
+          ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
+        .withValue("akka.remote.netty.tcp.enable-ssl",
+          ConfigValueFactory.fromAnyRef(true)))
+    } else {
+      None
+    }
+  }
+
+  /** Returns a string representation of this SSLOptions with all the passwords masked. */
+  override def toString: String = s"SSLOptions{enabled=$enabled, " +
+      s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
+      s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+      s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
+
+}
+
+private[spark] object SSLOptions extends Logging {
+
+  /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
+    *
+    * The following settings are allowed:
+    * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
+    * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
+    * $ - `[ns].keyStorePassword` - a password to the key-store file
+    * $ - `[ns].keyPassword` - a password to the private key
+    * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
+    *                         directory
+    * $ - `[ns].trustStorePassword` - a password to the trust-store file
+    * $ - `[ns].protocol` - a protocol name supported by a particular Java version
+    * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
+    *
+    * For a list of protocols and ciphers supported by particular Java versions, you may go to
+    * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
+    * blog page]].
+    *
+    * You can optionally specify the default configuration. If you do, for each setting which is
+    * missing in SparkConf, the corresponding setting is used from the default configuration.
+    *
+    * @param conf Spark configuration object where the settings are collected from
+    * @param ns the namespace name
+    * @param defaults the default configuration
+    * @return [[org.apache.spark.SSLOptions]] object
+    */
+  def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
+    val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
+
+    val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
+        .orElse(defaults.flatMap(_.keyStore))
+
+    val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
+        .orElse(defaults.flatMap(_.keyStorePassword))
+
+    val keyPassword = conf.getOption(s"$ns.keyPassword")
+        .orElse(defaults.flatMap(_.keyPassword))
+
+    val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
+        .orElse(defaults.flatMap(_.trustStore))
+
+    val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
+        .orElse(defaults.flatMap(_.trustStorePassword))
+
+    val protocol = conf.getOption(s"$ns.protocol")
+        .orElse(defaults.flatMap(_.protocol))
+
+    val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
+        .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
+        .orElse(defaults.map(_.enabledAlgorithms))
+        .getOrElse(Set.empty)
+
+    new SSLOptions(
+      enabled,
+      keyStore,
+      keyStorePassword,
+      keyPassword,
+      trustStore,
+      trustStorePassword,
+      protocol,
+      enabledAlgorithms)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index ec82d09..88d35a4 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -18,7 +18,11 @@
 package org.apache.spark
 
 import java.net.{Authenticator, PasswordAuthentication}
+import java.security.KeyStore
+import java.security.cert.X509Certificate
+import javax.net.ssl._
 
+import com.google.common.io.Files
 import org.apache.hadoop.io.Text
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -55,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  * Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
  * who always have permission to view or modify the Spark application.
  *
- * Spark does not currently support encryption after authentication.
+ * Starting from version 1.3, Spark has partial support for encrypted connections with SSL.
  *
  * At this point spark has multiple communication protocols that need to be secured and
  * different underlying mechanisms are used depending on the protocol:
@@ -67,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  *            to connect to the server. There is no control of the underlying
  *            authentication mechanism so its not clear if the password is passed in
  *            plaintext or uses DIGEST-MD5 or some other mechanism.
- *            Akka also has an option to turn on SSL, this option is not currently supported
- *            but we could add a configuration option in the future.
+ *
+ *            Akka also has an option to turn on SSL, this option is currently supported (see
+ *            the details below).
  *
  *  - HTTP for broadcast and file server (via HttpServer) ->  Spark currently uses Jetty
  *            for the HttpServer. Jetty supports multiple authentication mechanisms -
@@ -77,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  *            to authenticate using DIGEST-MD5 via a single user and the shared secret.
  *            Since we are using DIGEST-MD5, the shared secret is not passed on the wire
  *            in plaintext.
- *            We currently do not support SSL (https), but Jetty can be configured to use it
- *            so we could add a configuration option for this in the future.
+ *
+ *            We currently support SSL (https) for this communication protocol (see the details
+ *            below).
  *
  *            The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
  *            Any clients must specify the user and password. There is a default
@@ -142,9 +148,40 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  *  authentication. Spark will then use that user to compare against the view acls to do
  *  authorization. If not filter is in place the user is generally null and no authorization
  *  can take place.
+ *
+ *  Connection encryption (SSL) configuration is organized hierarchically. The user can configure
+ *  the default SSL settings which will be used for all the supported communication protocols unless
+ *  they are overwritten by protocol specific settings. This way the user can easily provide the
+ *  common settings for all the protocols without disabling the ability to configure each one
+ *  individually.
+ *
+ *  All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property,
+ *  denote the global configuration for all the supported protocols. In order to override the global
+ *  configuration for the particular protocol, the properties must be overwritten in the
+ *  protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
+ *  configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
+ *  Akka based connections or `fs` for broadcast and file server.
+ *
+ *  Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
+ *  options that can be specified.
+ *
+ *  SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
+ *  object parses Spark configuration at a given namespace and builds the common representation
+ *  of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
+ *  TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ *
+ *  SSL must be configured on each node and configured for each component involved in
+ *  communication using the particular protocol. In YARN clusters, the key-store can be prepared on
+ *  the client side then distributed and used by the executors as the part of the application
+ *  (YARN allows the user to deploy files before the application is started).
+ *  In standalone deployment, the user needs to provide key-stores and configuration
+ *  options for master and workers. In this mode, the user may allow the executors to use the SSL
+ *  settings inherited from the worker which spawned that executor. It can be accomplished by
+ *  setting `spark.ssl.useNodeLocalConf` to `true`.
  */
 
-private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
+private[spark] class SecurityManager(sparkConf: SparkConf)
+  extends Logging with SecretKeyHolder {
 
   // key used to store the spark secret in the Hadoop UGI
   private val sparkSecretLookupKey = "sparkCookie"
@@ -196,6 +233,57 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
     )
   }
 
+  // the default SSL configuration - it will be used by all communication layers unless overwritten
+  private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
+
+  // SSL configuration for different communication layers - they can override the default
+  // configuration at a specified namespace. The namespace *must* start with spark.ssl.
+  val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
+  val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
+
+  logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
+  logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
+
+  val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
+    val trustStoreManagers =
+      for (trustStore <- fileServerSSLOptions.trustStore) yield {
+        val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()
+
+        try {
+          val ks = KeyStore.getInstance(KeyStore.getDefaultType)
+          ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)
+
+          val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
+          tmf.init(ks)
+          tmf.getTrustManagers
+        } finally {
+          input.close()
+        }
+      }
+
+    lazy val credulousTrustStoreManagers = Array({
+      logWarning("Using 'accept-all' trust manager for SSL connections.")
+      new X509TrustManager {
+        override def getAcceptedIssuers: Array[X509Certificate] = null
+
+        override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+
+        override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+      }: TrustManager
+    })
+
+    val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))
+    sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)
+
+    val hostVerifier = new HostnameVerifier {
+      override def verify(s: String, sslSession: SSLSession): Boolean = true
+    }
+
+    (Some(sslContext.getSocketFactory), Some(hostVerifier))
+  } else {
+    (None, None)
+  }
+
   /**
    * Split a comma separated String, filter out any empty items, and return a Set of strings
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4d4c69d..13aa996 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -370,6 +370,7 @@ private[spark] object SparkConf {
     isAkkaConf(name) ||
     name.startsWith("spark.akka") ||
     name.startsWith("spark.auth") ||
+    name.startsWith("spark.ssl") ||
     isSparkPortConf(name)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index ea98051..1444c0d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
       uc = new URL(url).openConnection()
       uc.setConnectTimeout(httpReadTimeout)
     }
+    Utils.setupSecureURLConnection(uc, securityManager)
 
     val in = {
       uc.setReadTimeout(httpReadTimeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 65a1a8f..ae55b4f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(
 
   val user = System.getProperty("user.name", "<unknown>")
 
+  def copy(
+      name: String = name,
+      maxCores: Option[Int] = maxCores,
+      memoryPerSlave: Int = memoryPerSlave,
+      command: Command = command,
+      appUiUrl: String = appUiUrl,
+      eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
+    new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
+
   override def toString: String = "ApplicationDescription(" + name + ")"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7c1c831..38b3da0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -39,7 +39,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
   val timeout = AkkaUtils.askTimeout(conf)
 
   override def preStart() = {
-    masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
+    masterActor = context.actorSelection(
+      Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
 
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
@@ -161,7 +162,7 @@ object Client {
       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
 
     // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
-    Master.toAkkaUrl(driverArgs.master)
+    Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 
     actorSystem.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 58c95dc..b056a19 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -25,5 +25,13 @@ private[spark] class DriverDescription(
     val command: Command)
   extends Serializable {
 
+  def copy(
+      jarUrl: String = jarUrl,
+      mem: Int = mem,
+      cores: Int = cores,
+      supervise: Boolean = supervise,
+      command: Command = command): DriverDescription =
+    new DriverDescription(jarUrl, mem, cores, supervise, command)
+
   override def toString: String = s"DriverDescription (${command.mainClass})"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 39a7b03..ffe940f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -47,7 +47,7 @@ private[spark] class AppClient(
     conf: SparkConf)
   extends Logging {
 
-  val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+  val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
 
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
@@ -107,8 +107,9 @@ private[spark] class AppClient(
     def changeMaster(url: String) {
       // activeMasterUrl is a valid Spark url since we receive it from master.
       activeMasterUrl = url
-      master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
-      masterAddress = Master.toAkkaAddress(activeMasterUrl)
+      master = context.actorSelection(
+        Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(actorSystem)))
+      masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(actorSystem))
     }
 
     private def isPossibleMaster(remoteUrl: Address) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d92d993..5eeb9fe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -860,9 +860,9 @@ private[spark] object Master extends Logging {
    *
    * @throws SparkException if the url is invalid
    */
-  def toAkkaUrl(sparkUrl: String): String = {
+  def toAkkaUrl(sparkUrl: String, protocol: String): String = {
     val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
-    "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+    AkkaUtils.address(protocol, systemName, host, port, actorName)
   }
 
   /**
@@ -870,9 +870,9 @@ private[spark] object Master extends Logging {
    *
    * @throws SparkException if the url is invalid
    */
-  def toAkkaAddress(sparkUrl: String): Address = {
+  def toAkkaAddress(sparkUrl: String, protocol: String): Address = {
     val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
-    Address("akka.tcp", systemName, host, port)
+    Address(protocol, systemName, host, port)
   }
 
   def startSystemAndActor(

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index acbdf0d..bc9f78b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 
 import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 import org.apache.spark.util.logging.FileAppender
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1359983..b20f5c0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -31,8 +31,8 @@ import scala.util.Random
 import akka.actor._
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
@@ -93,7 +93,12 @@ private[spark] class Worker(
   var masterAddress: Address = null
   var activeMasterUrl: String = ""
   var activeMasterWebUiUrl : String = ""
-  val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
+  val akkaUrl = AkkaUtils.address(
+    AkkaUtils.protocol(context.system),
+    actorSystemName,
+    host,
+    port,
+    actorName)
   @volatile var registered = false
   @volatile var connected = false
   val workerId = generateWorkerId()
@@ -174,8 +179,9 @@ private[spark] class Worker(
     // activeMasterUrl it's a valid Spark url since we receive it from master.
     activeMasterUrl = url
     activeMasterWebUiUrl = uiUrl
-    master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
-    masterAddress = Master.toAkkaAddress(activeMasterUrl)
+    master = context.actorSelection(
+      Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system)))
+    masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system))
     connected = true
     // Cancel any outstanding re-registration attempts because we found a new master
     registrationRetryTimer.foreach(_.cancel())
@@ -347,10 +353,20 @@ private[spark] class Worker(
             }.toSeq
           }
           appDirectories(appId) = appLocalDirs
-
-          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
-            ExecutorState.LOADING)
+          val manager = new ExecutorRunner(
+            appId,
+            execId,
+            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
+            cores_,
+            memory_,
+            self,
+            workerId,
+            host,
+            sparkHome,
+            executorDir,
+            akkaUrl,
+            conf,
+            appLocalDirs, ExecutorState.LOADING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -406,7 +422,14 @@ private[spark] class Worker(
 
     case LaunchDriver(driverId, driverDesc) => {
       logInfo(s"Asked to launch driver $driverId")
-      val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
+      val driver = new DriverRunner(
+        conf,
+        driverId,
+        workDir,
+        sparkHome,
+        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
+        self,
+        akkaUrl)
       drivers(driverId) = driver
       driver.start()
 
@@ -523,10 +546,32 @@ private[spark] object Worker extends Logging {
     val securityMgr = new SecurityManager(conf)
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
       conf = conf, securityManager = securityMgr)
-    val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+    val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
     actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
       masterAkkaUrls, systemName, actorName,  workDir, conf, securityMgr), name = actorName)
     (actorSystem, boundPort)
   }
 
+  private[spark] def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
+    val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
+    val result = cmd.javaOpts.collectFirst {
+      case pattern(_result) => _result.toBoolean
+    }
+    result.getOrElse(false)
+  }
+
+  private[spark] def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = {
+    val prefix = "spark.ssl."
+    val useNLC = "spark.ssl.useNodeLocalConf"
+    if (isUseLocalNodeSSLConfig(cmd)) {
+      val newJavaOpts = cmd.javaOpts
+          .filter(opt => !opt.startsWith(s"-D$prefix")) ++
+          conf.getAll.collect { case (key, value) if key.startsWith(prefix) => s"-D$key=$value" } :+
+          s"-D$useNLC=true"
+      cmd.copy(javaOpts = newJavaOpts)
+    } else {
+      cmd
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8238253..bc72c89 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -123,7 +123,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       val executorConf = new SparkConf
       val port = executorConf.getInt("spark.executor.port", 0)
       val (fetcher, _) = AkkaUtils.createActorSystem(
-        "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
+        "driverPropsFetcher",
+        hostname,
+        port,
+        executorConf,
+        new SecurityManager(executorConf))
       val driver = fetcher.actorSelection(driverUrl)
       val timeout = AkkaUtils.askTimeout(executorConf)
       val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
@@ -132,7 +136,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       fetcher.shutdown()
 
       // Create SparkEnv using properties we fetched from the driver.
-      val driverConf = new SparkConf().setAll(props)
+      val driverConf = new SparkConf()
+      for ((key, value) <- props) {
+        // this is required for SSL in standalone mode
+        if (SparkConf.isExecutorStartupConf(key)) {
+          driverConf.setIfMissing(key, value)
+        } else {
+          driverConf.set(key, value)
+        }
+      }
       val env = SparkEnv.createExecutorEnv(
         driverConf, executorId, hostname, port, cores, isLocal = false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index ee10aa0..06786a5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{Path, FileSystem}
 import org.apache.spark.{Logging, SparkContext, SparkEnv}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.AkkaUtils
 
 private[spark] class SimrSchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -38,7 +39,8 @@ private[spark] class SimrSchedulerBackend(
   override def start() {
     super.start()
 
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+    val driverUrl = AkkaUtils.address(
+      AkkaUtils.protocol(actorSystem),
       SparkEnv.driverActorSystemName,
       sc.conf.get("spark.driver.host"),
       sc.conf.get("spark.driver.port"),

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb87a5..d2e1680 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 private[spark] class SparkDeploySchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -46,7 +46,8 @@ private[spark] class SparkDeploySchedulerBackend(
     super.start()
 
     // The endpoint for executors to talk to us
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+    val driverUrl = AkkaUtils.address(
+      AkkaUtils.protocol(actorSystem),
       SparkEnv.driverActorSystemName,
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 5289661..0d1c2a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
 import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, AkkaUtils}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -143,7 +143,8 @@ private[spark] class CoarseMesosSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+    val driverUrl = AkkaUtils.address(
+      AkkaUtils.protocol(sc.env.actorSystem),
       SparkEnv.driverActorSystemName,
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 4c9b1e3..3d9c619 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.Await
 import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Try
 
 import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
@@ -91,8 +92,11 @@ private[spark] object AkkaUtils extends Logging {
     val secureCookie = if (isAuthOn) secretKey else ""
     logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
 
-    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
-      ConfigFactory.parseString(
+    val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
+        .getOrElse(ConfigFactory.empty())
+
+    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String])
+      .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
       s"""
       |akka.daemonic = on
       |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -214,7 +218,7 @@ private[spark] object AkkaUtils extends Logging {
     val driverHost: String = conf.get("spark.driver.host", "localhost")
     val driverPort: Int = conf.getInt("spark.driver.port", 7077)
     Utils.checkHost(driverHost, "Expected hostname")
-    val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
+    val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
     val timeout = AkkaUtils.lookupTimeout(conf)
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
@@ -228,9 +232,33 @@ private[spark] object AkkaUtils extends Logging {
       actorSystem: ActorSystem): ActorRef = {
     val executorActorSystemName = SparkEnv.executorActorSystemName
     Utils.checkHost(host, "Expected hostname")
-    val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
+    val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
     val timeout = AkkaUtils.lookupTimeout(conf)
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
   }
+
+  def protocol(actorSystem: ActorSystem): String = {
+    val akkaConf = actorSystem.settings.config
+    val sslProp = "akka.remote.netty.tcp.enable-ssl"
+    protocol(akkaConf.hasPath(sslProp) && akkaConf.getBoolean(sslProp))
+  }
+
+  def protocol(ssl: Boolean = false): String = {
+    if (ssl) {
+      "akka.ssl.tcp"
+    } else {
+      "akka.tcp"
+    }
+  }
+
+  def address(
+      protocol: String,
+      systemName: String,
+      host: String,
+      port: Any,
+      actorName: String): String = {
+    s"$protocol://$systemName@$host:$port/user/$actorName"
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 31850b5..e9f2aed 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,8 +21,9 @@ import java.io._
 import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.{Locale, Properties, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
+import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import javax.net.ssl.HttpsURLConnection
 
 import scala.collection.JavaConversions._
 import scala.collection.Map
@@ -575,6 +576,7 @@ private[spark] object Utils extends Logging {
           logDebug("fetchFile not using security")
           uc = new URL(url).openConnection()
         }
+        Utils.setupSecureURLConnection(uc, securityMgr)
 
         val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
         uc.setConnectTimeout(timeout)
@@ -1820,6 +1822,20 @@ private[spark] object Utils extends Logging {
     PropertyConfigurator.configure(pro)
   }
 
+  /**
+   * If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and
+   * the host verifier from the given security manager.
+   */
+  def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = {
+    urlConnection match {
+      case https: HttpsURLConnection =>
+        sm.sslSocketFactory.foreach(https.setSSLSocketFactory)
+        sm.hostnameVerifier.foreach(https.setHostnameVerifier)
+        https
+      case connection => connection
+    }
+  }
+
   def invoke(
       clazz: Class[_],
       obj: AnyRef,

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/resources/keystore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/keystore b/core/src/test/resources/keystore
new file mode 100644
index 0000000..f8310e3
Binary files /dev/null and b/core/src/test/resources/keystore differ

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/resources/truststore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/truststore b/core/src/test/resources/truststore
new file mode 100644
index 0000000..a6b1d46
Binary files /dev/null and b/core/src/test/resources/truststore differ

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/resources/untrusted-keystore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/untrusted-keystore b/core/src/test/resources/untrusted-keystore
new file mode 100644
index 0000000..6015b02
Binary files /dev/null and b/core/src/test/resources/untrusted-keystore differ

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 0f49ce4..5fdf6bc 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -18,13 +18,19 @@
 package org.apache.spark
 
 import java.io._
+import java.net.URI
 import java.util.jar.{JarEntry, JarOutputStream}
+import javax.net.ssl.SSLHandshakeException
 
 import com.google.common.io.ByteStreams
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.commons.lang3.RandomUtils
 import org.scalatest.FunSuite
 
 import org.apache.spark.util.Utils
 
+import SSLSampleConfigs._
+
 class FileServerSuite extends FunSuite with LocalSparkContext {
 
   @transient var tmpDir: File = _
@@ -168,4 +174,88 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("HttpFileServer should work with SSL") {
+    val sparkConf = sparkSSLConfig()
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      fileTransferTest(server, sm)
+    } finally {
+      server.stop()
+    }
+  }
+
+  test ("HttpFileServer should work with SSL and good credentials") {
+    val sparkConf = sparkSSLConfig()
+    sparkConf.set("spark.authenticate", "true")
+    sparkConf.set("spark.authenticate.secret", "good")
+
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      fileTransferTest(server, sm)
+    } finally {
+      server.stop()
+    }
+  }
+
+  test ("HttpFileServer should not work with valid SSL and bad credentials") {
+    val sparkConf = sparkSSLConfig()
+    sparkConf.set("spark.authenticate", "true")
+    sparkConf.set("spark.authenticate.secret", "bad")
+
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      intercept[IOException] {
+        fileTransferTest(server)
+      }
+    } finally {
+      server.stop()
+    }
+  }
+
+  test ("HttpFileServer should not work with SSL when the server is untrusted") {
+    val sparkConf = sparkSSLConfigUntrusted()
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      intercept[SSLHandshakeException] {
+        fileTransferTest(server)
+      }
+    } finally {
+      server.stop()
+    }
+  }
+
+  def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
+    val randomContent = RandomUtils.nextBytes(100)
+    val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
+    FileUtils.writeByteArrayToFile(file, randomContent)
+    server.addFile(file)
+
+    val uri = new URI(server.serverUri + "/files/" + file.getName)
+
+    val connection = if (sm != null && sm.isAuthenticationEnabled()) {
+      Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
+    } else {
+      uri.toURL.openConnection()
+    }
+
+    if (sm != null) {
+      Utils.setupSecureURLConnection(connection, sm)
+    }
+
+    val buf = IOUtils.toByteArray(connection.getInputStream)
+    assert(buf === randomContent)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index d27880f..ccfe067 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -120,7 +120,7 @@ class MapOutputTrackerSuite extends FunSuite {
       securityManager = new SecurityManager(conf))
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
new file mode 100644
index 0000000..444a333
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.google.common.io.Files
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
+
+  test("test resolving property file as spark conf ") {
+    val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+    val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+    val conf = new SparkConf
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.protocol", "SSLv3")
+
+    val opts = SSLOptions.parse(conf, "spark.ssl")
+
+    assert(opts.enabled === true)
+    assert(opts.trustStore.isDefined === true)
+    assert(opts.trustStore.get.getName === "truststore")
+    assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.keyStore.isDefined === true)
+    assert(opts.keyStore.get.getName === "keystore")
+    assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.keyStorePassword === Some("password"))
+    assert(opts.keyPassword === Some("password"))
+    assert(opts.protocol === Some("SSLv3"))
+    assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+  }
+
+  test("test resolving property with defaults specified ") {
+    val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+    val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+    val conf = new SparkConf
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.protocol", "SSLv3")
+
+    val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
+    val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+
+    assert(opts.enabled === true)
+    assert(opts.trustStore.isDefined === true)
+    assert(opts.trustStore.get.getName === "truststore")
+    assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.keyStore.isDefined === true)
+    assert(opts.keyStore.get.getName === "keystore")
+    assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.keyStorePassword === Some("password"))
+    assert(opts.keyPassword === Some("password"))
+    assert(opts.protocol === Some("SSLv3"))
+    assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+  }
+
+  test("test whether defaults can be overridden ") {
+    val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+    val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+    val conf = new SparkConf
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ui.ssl.enabled", "false")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ui.ssl.keyStorePassword", "12345")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
+    conf.set("spark.ssl.protocol", "SSLv3")
+
+    val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
+    val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+
+    assert(opts.enabled === false)
+    assert(opts.trustStore.isDefined === true)
+    assert(opts.trustStore.get.getName === "truststore")
+    assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.keyStore.isDefined === true)
+    assert(opts.keyStore.get.getName === "keystore")
+    assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.keyStorePassword === Some("12345"))
+    assert(opts.keyPassword === Some("password"))
+    assert(opts.protocol === Some("SSLv3"))
+    assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
new file mode 100644
index 0000000..ace8123
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+object SSLSampleConfigs {
+  val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+  val untrustedKeyStorePath = new File(this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
+  val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+  def sparkSSLConfig() = {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, SSL_RSA_WITH_DES_CBC_SHA")
+    conf.set("spark.ssl.protocol", "TLSv1")
+    conf
+  }
+
+  def sparkSSLConfigUntrusted() = {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", untrustedKeyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, SSL_RSA_WITH_DES_CBC_SHA")
+    conf.set("spark.ssl.protocol", "TLSv1")
+    conf
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index fcca086..43fbd3f 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import scala.collection.mutable.ArrayBuffer
+import java.io.File
 
 import org.scalatest.FunSuite
 
@@ -125,6 +125,54 @@ class SecurityManagerSuite extends FunSuite {
 
   }
 
+  test("ssl on setup") {
+    val conf = SSLSampleConfigs.sparkSSLConfig()
+
+    val securityManager = new SecurityManager(conf)
+
+    assert(securityManager.fileServerSSLOptions.enabled === true)
+    assert(securityManager.akkaSSLOptions.enabled === true)
+
+    assert(securityManager.sslSocketFactory.isDefined === true)
+    assert(securityManager.hostnameVerifier.isDefined === true)
+
+    assert(securityManager.fileServerSSLOptions.trustStore.isDefined === true)
+    assert(securityManager.fileServerSSLOptions.trustStore.get.getName === "truststore")
+    assert(securityManager.fileServerSSLOptions.keyStore.isDefined === true)
+    assert(securityManager.fileServerSSLOptions.keyStore.get.getName === "keystore")
+    assert(securityManager.fileServerSSLOptions.trustStorePassword === Some("password"))
+    assert(securityManager.fileServerSSLOptions.keyStorePassword === Some("password"))
+    assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
+    assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1"))
+    assert(securityManager.fileServerSSLOptions.enabledAlgorithms ===
+        Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
+
+    assert(securityManager.akkaSSLOptions.trustStore.isDefined === true)
+    assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore")
+    assert(securityManager.akkaSSLOptions.keyStore.isDefined === true)
+    assert(securityManager.akkaSSLOptions.keyStore.get.getName === "keystore")
+    assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password"))
+    assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password"))
+    assert(securityManager.akkaSSLOptions.keyPassword === Some("password"))
+    assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1"))
+    assert(securityManager.akkaSSLOptions.enabledAlgorithms ===
+        Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
+  }
+
+  test("ssl off setup") {
+    val file = File.createTempFile("SSLOptionsSuite", "conf")
+    file.deleteOnExit()
+
+    System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
+    val conf = new SparkConf()
+
+    val securityManager = new SecurityManager(conf)
+
+    assert(securityManager.fileServerSSLOptions.enabled === false)
+    assert(securityManager.akkaSSLOptions.enabled === false)
+    assert(securityManager.sslSocketFactory.isDefined === false)
+    assert(securityManager.hostnameVerifier.isDefined === false)
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
index d2dae34..518073d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.Matchers
 class ClientSuite extends FunSuite with Matchers {
   test("correctly validates driver jar URL's") {
     ClientArguments.isValidJarUrl("http://someHost:8080/foo.jar") should be (true)
+    ClientArguments.isValidJarUrl("https://someHost:8080/foo.jar") should be (true)
 
     // file scheme with authority and path is valid.
     ClientArguments.isValidJarUrl("file://somehost/path/to/a/jarFile.jar") should be (true)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 3d2335f..34c74d8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -20,30 +20,46 @@ package org.apache.spark.deploy.master
 import akka.actor.Address
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SSLOptions, SparkConf, SparkException}
 
 class MasterSuite extends FunSuite {
 
   test("toAkkaUrl") {
-    val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234")
+    val conf = new SparkConf(loadDefaults = false)
+    val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.tcp")
     assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
   }
 
+  test("toAkkaUrl with SSL") {
+    val conf = new SparkConf(loadDefaults = false)
+    val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.ssl.tcp")
+    assert("akka.ssl.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
+  }
+
   test("toAkkaUrl: a typo url") {
+    val conf = new SparkConf(loadDefaults = false)
     val e = intercept[SparkException] {
-      Master.toAkkaUrl("spark://1.2. 3.4:1234")
+      Master.toAkkaUrl("spark://1.2. 3.4:1234", "akka.tcp")
     }
     assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
   }
 
   test("toAkkaAddress") {
-    val address = Master.toAkkaAddress("spark://1.2.3.4:1234")
+    val conf = new SparkConf(loadDefaults = false)
+    val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.tcp")
     assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
   }
 
+  test("toAkkaAddress with SSL") {
+    val conf = new SparkConf(loadDefaults = false)
+    val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.ssl.tcp")
+    assert(Address("akka.ssl.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
+  }
+
   test("toAkkaAddress: a typo url") {
+    val conf = new SparkConf(loadDefaults = false)
     val e = intercept[SparkException] {
-      Master.toAkkaAddress("spark://1.2. 3.4:1234")
+      Master.toAkkaAddress("spark://1.2. 3.4:1234", "akka.tcp")
     }
     assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
new file mode 100644
index 0000000..84e2fd7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.Command
+
+import org.scalatest.{Matchers, FunSuite}
+
+class WorkerSuite extends FunSuite with Matchers {
+
+  def cmd(javaOpts: String*) = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
+  def conf(opts: (String, String)*) = new SparkConf(loadDefaults = false).setAll(opts)
+
+  test("test isUseLocalNodeSSLConfig") {
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) shouldBe true
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=false")) shouldBe false
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=")) shouldBe false
+  }
+
+  test("test maybeUpdateSSLSettings") {
+    Worker.maybeUpdateSSLSettings(
+      cmd("-Dasdf=dfgh", "-Dspark.ssl.opt1=x"),
+      conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+        .javaOpts should contain theSameElementsInOrderAs Seq(
+          "-Dasdf=dfgh", "-Dspark.ssl.opt1=x")
+
+    Worker.maybeUpdateSSLSettings(
+      cmd("-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x"),
+      conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+        .javaOpts should contain theSameElementsInOrderAs Seq(
+          "-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x")
+
+    Worker.maybeUpdateSSLSettings(
+      cmd("-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=x"),
+      conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+        .javaOpts should contain theSameElementsAs Seq(
+          "-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=y", "-Dspark.ssl.opt2=z")
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 6bbf72e..39e5d36 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util
 
+import java.util.concurrent.TimeoutException
+
 import scala.concurrent.Await
 
 import akka.actor._
@@ -26,6 +28,7 @@ import org.scalatest.FunSuite
 import org.apache.spark._
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.SSLSampleConfigs._
 
 
 /**
@@ -47,7 +50,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val badconf = new SparkConf
     badconf.set("spark.authenticate", "true")
@@ -60,7 +63,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
       conf = conf, securityManager = securityManagerBad)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     intercept[akka.actor.ActorNotFound] {
       slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -74,7 +77,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     val conf = new SparkConf
     conf.set("spark.authenticate", "false")
     conf.set("spark.authenticate.secret", "bad")
-    val securityManager = new SecurityManager(conf);
+    val securityManager = new SecurityManager(conf)
 
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -85,18 +88,18 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val badconf = new SparkConf
     badconf.set("spark.authenticate", "false")
     badconf.set("spark.authenticate.secret", "good")
-    val securityManagerBad = new SecurityManager(badconf);
+    val securityManagerBad = new SecurityManager(badconf)
 
     val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
       conf = badconf, securityManager = securityManagerBad)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
 
@@ -124,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     val conf = new SparkConf
     conf.set("spark.authenticate", "true")
     conf.set("spark.authenticate.secret", "good")
-    val securityManager = new SecurityManager(conf);
+    val securityManager = new SecurityManager(conf)
 
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -135,12 +138,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val goodconf = new SparkConf
     goodconf.set("spark.authenticate", "true")
     goodconf.set("spark.authenticate.secret", "good")
-    val securityManagerGood = new SecurityManager(goodconf);
+    val securityManagerGood = new SecurityManager(goodconf)
 
     assert(securityManagerGood.isAuthenticationEnabled() === true)
 
@@ -148,7 +151,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
       conf = goodconf, securityManager = securityManagerGood)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
 
@@ -175,7 +178,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     conf.set("spark.authenticate", "true")
     conf.set("spark.authenticate.secret", "good")
 
-    val securityManager = new SecurityManager(conf);
+    val securityManager = new SecurityManager(conf)
 
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -186,12 +189,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val badconf = new SparkConf
     badconf.set("spark.authenticate", "false")
     badconf.set("spark.authenticate.secret", "bad")
-    val securityManagerBad = new SecurityManager(badconf);
+    val securityManagerBad = new SecurityManager(badconf)
 
     assert(securityManagerBad.isAuthenticationEnabled() === false)
 
@@ -199,7 +202,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
       conf = badconf, securityManager = securityManagerBad)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     intercept[akka.actor.ActorNotFound] {
       slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -209,4 +212,170 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     slaveSystem.shutdown()
   }
 
+  test("remote fetch ssl on") {
+    val conf = sparkSSLConfig()
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === false)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+    assert(securityManagerBad.isAuthenticationEnabled() === false)
+
+    masterTracker.registerShuffle(10, 1)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    masterTracker.registerMapOutput(10, 0,
+      MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    // this should succeed since security off
+    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+      Seq((BlockManagerId("a", "hostA", 1000), size1000)))
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+
+  test("remote fetch ssl on and security enabled") {
+    val conf = sparkSSLConfig()
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === true)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    slaveConf.set("spark.authenticate", "true")
+    slaveConf.set("spark.authenticate.secret", "good")
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+    assert(securityManagerBad.isAuthenticationEnabled() === true)
+
+    masterTracker.registerShuffle(10, 1)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    masterTracker.registerMapOutput(10, 0,
+      MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+      Seq((BlockManagerId("a", "hostA", 1000), size1000)))
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+
+  test("remote fetch ssl on and security enabled - bad credentials") {
+    val conf = sparkSSLConfig()
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === true)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    slaveConf.set("spark.authenticate", "true")
+    slaveConf.set("spark.authenticate.secret", "bad")
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    intercept[akka.actor.ActorNotFound] {
+      slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+    }
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+
+  test("remote fetch ssl on - untrusted server") {
+    val conf = sparkSSLConfigUntrusted()
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === false)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    intercept[TimeoutException] {
+      slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+    }
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org