You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/02/03 02:28:11 UTC

[1/2] spark git commit: Spark 3883: SSL support for HttpServer and Akka

Repository: spark
Updated Branches:
  refs/heads/master ef65cf09b -> cfea30037


http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 08c6bef..62d3fca 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1242,6 +1242,86 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 </table>
 
+#### Encryption
+
+<table class="table">
+    <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+    <tr>
+        <td><code>spark.ssl.enabled</code></td>
+        <td>false</td>
+        <td>
+            <p>Whether to enable SSL connections on all supported protocols.</p>
+
+            <p>All the SSL settings like <code>spark.ssl.xxx</code> where <code>xxx</code> is a
+            particular configuration property, denote the global configuration for all the supported
+            protocols. In order to override the global configuration for the particular protocol,
+            the properties must be overwritten in the protocol-specific namespace.</p>
+
+            <p>Use <code>spark.ssl.YYY.XXX</code> settings to overwrite the global configuration for
+            particular protocol denoted by <code>YYY</code>. Currently <code>YYY</code> can be
+            either <code>akka</code> for Akka based connections or <code>fs</code> for broadcast and
+            file server.</p>
+        </td>
+    </tr>
+    <tr>
+        <td><code>spark.ssl.keyStore</code></td>
+        <td>None</td>
+        <td>
+            A path to a key-store file. The path can be absolute or relative to the directory where
+            the component is started in.
+        </td>
+    </tr>
+    <tr>
+        <td><code>spark.ssl.keyStorePassword</code></td>
+        <td>None</td>
+        <td>
+            A password to the key-store.
+        </td>
+    </tr>
+    <tr>
+        <td><code>spark.ssl.keyPassword</code></td>
+        <td>None</td>
+        <td>
+            A password to the private key in key-store.
+        </td>
+    </tr>
+    <tr>
+        <td><code>spark.ssl.trustStore</code></td>
+        <td>None</td>
+        <td>
+            A path to a trust-store file. The path can be absolute or relative to the directory
+            where the component is started in.
+        </td>
+    </tr>
+    <tr>
+        <td><code>spark.ssl.trustStorePassword</code></td>
+        <td>None</td>
+        <td>
+            A password to the trust-store.
+        </td>
+    </tr>
+    <tr>
+        <td><code>spark.ssl.protocol</code></td>
+        <td>None</td>
+        <td>
+            A protocol name. The protocol must be supported by JVM. The reference list of protocols
+            one can find on <a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this</a>
+            page.
+        </td>
+    </tr>
+    <tr>
+        <td><code>spark.ssl.enabledAlgorithms</code></td>
+        <td>Empty</td>
+        <td>
+            A comma separated list of ciphers. The specified ciphers must be supported by JVM.
+            The reference list of protocols one can find on
+            <a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this</a>
+            page.
+        </td>
+    </tr>
+</table>
+
+
 #### Spark Streaming
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index 1e206a1..6e0a54f 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -20,6 +20,30 @@ Spark allows for a set of administrators to be specified in the acls who always
 
 If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access.
 
+## Encryption
+
+Spark supports SSL for Akka and HTTP (for broadcast and file server) protocols. However SSL is not supported yet for WebUI and block transfer service.
+
+Connection encryption (SSL) configuration is organized hierarchically. The user can configure the default SSL settings which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka` and HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`. The full breakdown can be found on the [configuration page](configuration.html).
+
+SSL must be configured on each node and configured for each component involved in communication using the particular protocol.
+
+### YARN mode
+The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
+
+### Standalone mode
+The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
+
+### Preparing the key-stores
+Key-stores can be generated by `keytool` program. The reference documentation for this tool is
+[here](https://docs.oracle.com/javase/7/docs/technotes/tools/solaris/keytool.html). The most basic
+steps to configure the key-stores and the trust-store for the standalone deployment mode is as
+follows:
+* Generate a keys pair for each node
+* Export the public key of the key pair to a file on each node
+* Import all exported public keys into a single trust-store
+* Distribute the trust-store over the nodes
+
 ## Configuring Ports for Network Security
 
 Spark makes heavy use of the network, and some environments have strict requirements for using tight

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index b46df12..9805609 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -45,7 +45,7 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
 
   // Hadoop FileSystem object for our URI, if it isn't using HTTP
   var fileSystem: FileSystem = {
-    if (uri.getScheme() == "http") {
+    if (Set("http", "https", "ftp").contains(uri.getScheme)) {
       null
     } else {
       FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf))
@@ -78,13 +78,16 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
         if (fileSystem != null) {
           fileSystem.open(new Path(directory, pathInDirectory))
         } else {
-          if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
+          val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
             val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
             val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
-            newuri.toURL().openStream()
+            newuri.toURL
           } else {
-            new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+            new URL(classUri + "/" + urlEncode(pathInDirectory))
           }
+
+          Utils.setupSecureURLConnection(url.openConnection(), SparkEnv.get.securityManager)
+            .getInputStream
         }
       }
       val bytes = readAndTransformClass(name, inputStream)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 716cf2c..7d29ed8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -67,8 +67,12 @@ private[streaming] class ReceiverSupervisorImpl(
   private val trackerActor = {
     val ip = env.conf.get("spark.driver.host", "localhost")
     val port = env.conf.getInt("spark.driver.port", 7077)
-    val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
-      SparkEnv.driverActorSystemName, ip, port)
+    val url = AkkaUtils.address(
+      AkkaUtils.protocol(env.actorSystem),
+      SparkEnv.driverActorSystemName,
+      ip,
+      port,
+      "ReceiverTracker")
     env.actorSystem.actorSelection(url)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index eb328b2..37e98e0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -244,7 +244,9 @@ private[spark] class ApplicationMaster(
       host: String,
       port: String,
       isDriver: Boolean): Unit = {
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+    
+    val driverUrl = AkkaUtils.address(
+      AkkaUtils.protocol(actorSystem),
       SparkEnv.driverActorSystemName,
       host,
       port,

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 040406c..0dbb615 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -34,9 +34,10 @@ import org.apache.hadoop.yarn.util.RackResolver
 
 import org.apache.log4j.{Level, Logger}
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.AkkaUtils
 
 /**
  * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -106,7 +107,9 @@ private[yarn] class YarnAllocator(
     new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
   launcherPool.allowCoreThreadTimeOut(true)
 
-  private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
+  private val driverUrl = AkkaUtils.address(
+    AkkaUtils.protocol(securityMgr.akkaSSLOptions.enabled),
+    SparkEnv.driverActorSystemName,
     sparkConf.get("spark.driver.host"),
     sparkConf.get("spark.driver.port"),
     CoarseGrainedSchedulerBackend.ACTOR_NAME)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: Spark 3883: SSL support for HttpServer and Akka

Posted by jo...@apache.org.
Spark 3883: SSL support for HttpServer and Akka

SPARK-3883: SSL support for Akka connections and Jetty based file servers.

This story introduced the following changes:
- Introduced SSLOptions object which holds the SSL configuration and can build the appropriate configuration for Akka or Jetty. SSLOptions can be created by parsing SparkConf entries at a specified namespace.
- SSLOptions is created and kept by SecurityManager
- All Akka actor address creation snippets based on interpolated strings were replaced by a dedicated methods from AkkaUtils. Those methods select the proper Akka protocol - whether akka.tcp or akka.ssl.tcp
- Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager
- Added a way to use node local SSL configuration by executors and driver in standalone mode. It can be done by specifying spark.ssl.useNodeLocalConf in SparkConf.
- Made CoarseGrainedExecutorBackend not overwrite the settings which are executor startup configuration - they are passed anyway from Worker

Refer to https://github.com/apache/spark/pull/3571 for discussion and details

Author: Jacek Lewandowski <le...@gmail.com>
Author: Jacek Lewandowski <ja...@datastax.com>

Closes #3571 from jacek-lewandowski/SPARK-3883-master and squashes the following commits:

9ef4ed1 [Jacek Lewandowski] Merge pull request #2 from jacek-lewandowski/SPARK-3883-docs2
fb31b49 [Jacek Lewandowski] SPARK-3883: Added SSL setup documentation
2532668 [Jacek Lewandowski] SPARK-3883: Refactored AkkaUtils.protocol method to not use Try
90a8762 [Jacek Lewandowski] SPARK-3883: Refactored methods to resolve Akka address and made it possible to easily configure multiple communication layers for SSL
72b2541 [Jacek Lewandowski] SPARK-3883: A reference to the fallback SSLOptions can be provided when constructing SSLOptions
93050f4 [Jacek Lewandowski] SPARK-3883: SSL support for HttpServer and Akka


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfea3003
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfea3003
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfea3003

Branch: refs/heads/master
Commit: cfea30037ff4ac7e386a1478e7dce07ca3bb9072
Parents: ef65cf0
Author: Jacek Lewandowski <le...@gmail.com>
Authored: Mon Feb 2 17:18:54 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Feb 2 17:27:26 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpServer.scala     |  11 +-
 .../scala/org/apache/spark/SSLOptions.scala     | 178 +++++++++++++++++
 .../org/apache/spark/SecurityManager.scala      | 100 +++++++++-
 .../main/scala/org/apache/spark/SparkConf.scala |   1 +
 .../apache/spark/broadcast/HttpBroadcast.scala  |   1 +
 .../spark/deploy/ApplicationDescription.scala   |   9 +
 .../scala/org/apache/spark/deploy/Client.scala  |   5 +-
 .../apache/spark/deploy/DriverDescription.scala |   8 +
 .../apache/spark/deploy/client/AppClient.scala  |   7 +-
 .../org/apache/spark/deploy/master/Master.scala |   8 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |   2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  67 +++++--
 .../executor/CoarseGrainedExecutorBackend.scala |  16 +-
 .../cluster/SimrSchedulerBackend.scala          |   4 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   5 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   5 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  36 +++-
 .../scala/org/apache/spark/util/Utils.scala     |  20 +-
 core/src/test/resources/keystore                | Bin 0 -> 2247 bytes
 core/src/test/resources/truststore              | Bin 0 -> 957 bytes
 core/src/test/resources/untrusted-keystore      | Bin 0 -> 2246 bytes
 .../org/apache/spark/FileServerSuite.scala      |  90 +++++++++
 .../apache/spark/MapOutputTrackerSuite.scala    |   2 +-
 .../org/apache/spark/SSLOptionsSuite.scala      | 123 ++++++++++++
 .../org/apache/spark/SSLSampleConfigs.scala     |  55 ++++++
 .../org/apache/spark/SecurityManagerSuite.scala |  50 ++++-
 .../org/apache/spark/deploy/ClientSuite.scala   |   1 +
 .../spark/deploy/master/MasterSuite.scala       |  26 ++-
 .../spark/deploy/worker/WorkerSuite.scala       |  57 ++++++
 .../org/apache/spark/util/AkkaUtilsSuite.scala  | 197 +++++++++++++++++--
 docs/configuration.md                           |  80 ++++++++
 docs/security.md                                |  24 +++
 .../apache/spark/repl/ExecutorClassLoader.scala |  11 +-
 .../receiver/ReceiverSupervisorImpl.scala       |   8 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   4 +-
 .../spark/deploy/yarn/YarnAllocator.scala       |   7 +-
 36 files changed, 1145 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index fa22787..09a9ccc 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.io.File
 
+import org.eclipse.jetty.server.ssl.SslSocketConnector
 import org.eclipse.jetty.util.security.{Constraint, Password}
 import org.eclipse.jetty.security.authentication.DigestAuthenticator
 import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
@@ -72,7 +73,10 @@ private[spark] class HttpServer(
    */
   private def doStart(startPort: Int): (Server, Int) = {
     val server = new Server()
-    val connector = new SocketConnector
+
+    val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
+      .map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
+
     connector.setMaxIdleTime(60 * 1000)
     connector.setSoLingerTime(-1)
     connector.setPort(startPort)
@@ -149,13 +153,14 @@ private[spark] class HttpServer(
   }
 
   /**
-   * Get the URI of this HTTP server (http://host:port)
+   * Get the URI of this HTTP server (http://host:port or https://host:port)
    */
   def uri: String = {
     if (server == null) {
       throw new ServerStateException("Server is not started")
     } else {
-      "http://" + Utils.localIpAddress + ":" + port
+      val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
+      s"$scheme://${Utils.localIpAddress}:$port"
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/SSLOptions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
new file mode 100644
index 0000000..2cdc167
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.eclipse.jetty.util.ssl.SslContextFactory
+
+/**
+ * SSLOptions class is a common container for SSL configuration options. It offers methods to
+ * generate specific objects to configure SSL for different communication protocols.
+ *
+ * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
+ * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
+ * authentication with SSL, SSLOptions cannot support it either.
+ *
+ * @param enabled             enables or disables SSL; if it is set to false, the rest of the
+ *                            settings are disregarded
+ * @param keyStore            a path to the key-store file
+ * @param keyStorePassword    a password to access the key-store file
+ * @param keyPassword         a password to access the private key in the key-store
+ * @param trustStore          a path to the trust-store file
+ * @param trustStorePassword  a password to access the trust-store file
+ * @param protocol            SSL protocol (remember that SSLv3 was compromised) supported by Java
+ * @param enabledAlgorithms   a set of encryption algorithms to use
+ */
+private[spark] case class SSLOptions(
+    enabled: Boolean = false,
+    keyStore: Option[File] = None,
+    keyStorePassword: Option[String] = None,
+    keyPassword: Option[String] = None,
+    trustStore: Option[File] = None,
+    trustStorePassword: Option[String] = None,
+    protocol: Option[String] = None,
+    enabledAlgorithms: Set[String] = Set.empty) {
+
+  /**
+   * Creates a Jetty SSL context factory according to the SSL settings represented by this object.
+   */
+  def createJettySslContextFactory(): Option[SslContextFactory] = {
+    if (enabled) {
+      val sslContextFactory = new SslContextFactory()
+
+      keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
+      trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
+      keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
+      trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
+      keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
+      protocol.foreach(sslContextFactory.setProtocol)
+      sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
+
+      Some(sslContextFactory)
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Creates an Akka configuration object which contains all the SSL settings represented by this
+   * object. It can be used then to compose the ultimate Akka configuration.
+   */
+  def createAkkaConfig: Option[Config] = {
+    import scala.collection.JavaConversions._
+    if (enabled) {
+      Some(ConfigFactory.empty()
+        .withValue("akka.remote.netty.tcp.security.key-store",
+          ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.key-store-password",
+          ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.trust-store",
+          ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.trust-store-password",
+          ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.key-password",
+          ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.random-number-generator",
+          ConfigValueFactory.fromAnyRef(""))
+        .withValue("akka.remote.netty.tcp.security.protocol",
+          ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
+        .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
+          ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
+        .withValue("akka.remote.netty.tcp.enable-ssl",
+          ConfigValueFactory.fromAnyRef(true)))
+    } else {
+      None
+    }
+  }
+
+  /** Returns a string representation of this SSLOptions with all the passwords masked. */
+  override def toString: String = s"SSLOptions{enabled=$enabled, " +
+      s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
+      s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+      s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
+
+}
+
+private[spark] object SSLOptions extends Logging {
+
+  /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
+    *
+    * The following settings are allowed:
+    * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
+    * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
+    * $ - `[ns].keyStorePassword` - a password to the key-store file
+    * $ - `[ns].keyPassword` - a password to the private key
+    * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
+    *                         directory
+    * $ - `[ns].trustStorePassword` - a password to the trust-store file
+    * $ - `[ns].protocol` - a protocol name supported by a particular Java version
+    * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
+    *
+    * For a list of protocols and ciphers supported by particular Java versions, you may go to
+    * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
+    * blog page]].
+    *
+    * You can optionally specify the default configuration. If you do, for each setting which is
+    * missing in SparkConf, the corresponding setting is used from the default configuration.
+    *
+    * @param conf Spark configuration object where the settings are collected from
+    * @param ns the namespace name
+    * @param defaults the default configuration
+    * @return [[org.apache.spark.SSLOptions]] object
+    */
+  def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
+    val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
+
+    val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
+        .orElse(defaults.flatMap(_.keyStore))
+
+    val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
+        .orElse(defaults.flatMap(_.keyStorePassword))
+
+    val keyPassword = conf.getOption(s"$ns.keyPassword")
+        .orElse(defaults.flatMap(_.keyPassword))
+
+    val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
+        .orElse(defaults.flatMap(_.trustStore))
+
+    val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
+        .orElse(defaults.flatMap(_.trustStorePassword))
+
+    val protocol = conf.getOption(s"$ns.protocol")
+        .orElse(defaults.flatMap(_.protocol))
+
+    val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
+        .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
+        .orElse(defaults.map(_.enabledAlgorithms))
+        .getOrElse(Set.empty)
+
+    new SSLOptions(
+      enabled,
+      keyStore,
+      keyStorePassword,
+      keyPassword,
+      trustStore,
+      trustStorePassword,
+      protocol,
+      enabledAlgorithms)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index ec82d09..88d35a4 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -18,7 +18,11 @@
 package org.apache.spark
 
 import java.net.{Authenticator, PasswordAuthentication}
+import java.security.KeyStore
+import java.security.cert.X509Certificate
+import javax.net.ssl._
 
+import com.google.common.io.Files
 import org.apache.hadoop.io.Text
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -55,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  * Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
  * who always have permission to view or modify the Spark application.
  *
- * Spark does not currently support encryption after authentication.
+ * Starting from version 1.3, Spark has partial support for encrypted connections with SSL.
  *
  * At this point spark has multiple communication protocols that need to be secured and
  * different underlying mechanisms are used depending on the protocol:
@@ -67,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  *            to connect to the server. There is no control of the underlying
  *            authentication mechanism so its not clear if the password is passed in
  *            plaintext or uses DIGEST-MD5 or some other mechanism.
- *            Akka also has an option to turn on SSL, this option is not currently supported
- *            but we could add a configuration option in the future.
+ *
+ *            Akka also has an option to turn on SSL, this option is currently supported (see
+ *            the details below).
  *
  *  - HTTP for broadcast and file server (via HttpServer) ->  Spark currently uses Jetty
  *            for the HttpServer. Jetty supports multiple authentication mechanisms -
@@ -77,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  *            to authenticate using DIGEST-MD5 via a single user and the shared secret.
  *            Since we are using DIGEST-MD5, the shared secret is not passed on the wire
  *            in plaintext.
- *            We currently do not support SSL (https), but Jetty can be configured to use it
- *            so we could add a configuration option for this in the future.
+ *
+ *            We currently support SSL (https) for this communication protocol (see the details
+ *            below).
  *
  *            The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
  *            Any clients must specify the user and password. There is a default
@@ -142,9 +148,40 @@ import org.apache.spark.network.sasl.SecretKeyHolder
  *  authentication. Spark will then use that user to compare against the view acls to do
  *  authorization. If not filter is in place the user is generally null and no authorization
  *  can take place.
+ *
+ *  Connection encryption (SSL) configuration is organized hierarchically. The user can configure
+ *  the default SSL settings which will be used for all the supported communication protocols unless
+ *  they are overwritten by protocol specific settings. This way the user can easily provide the
+ *  common settings for all the protocols without disabling the ability to configure each one
+ *  individually.
+ *
+ *  All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property,
+ *  denote the global configuration for all the supported protocols. In order to override the global
+ *  configuration for the particular protocol, the properties must be overwritten in the
+ *  protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
+ *  configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
+ *  Akka based connections or `fs` for broadcast and file server.
+ *
+ *  Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
+ *  options that can be specified.
+ *
+ *  SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
+ *  object parses Spark configuration at a given namespace and builds the common representation
+ *  of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
+ *  TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ *
+ *  SSL must be configured on each node and configured for each component involved in
+ *  communication using the particular protocol. In YARN clusters, the key-store can be prepared on
+ *  the client side then distributed and used by the executors as the part of the application
+ *  (YARN allows the user to deploy files before the application is started).
+ *  In standalone deployment, the user needs to provide key-stores and configuration
+ *  options for master and workers. In this mode, the user may allow the executors to use the SSL
+ *  settings inherited from the worker which spawned that executor. It can be accomplished by
+ *  setting `spark.ssl.useNodeLocalConf` to `true`.
  */
 
-private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
+private[spark] class SecurityManager(sparkConf: SparkConf)
+  extends Logging with SecretKeyHolder {
 
   // key used to store the spark secret in the Hadoop UGI
   private val sparkSecretLookupKey = "sparkCookie"
@@ -196,6 +233,57 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
     )
   }
 
+  // the default SSL configuration - it will be used by all communication layers unless overwritten
+  private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
+
+  // SSL configuration for different communication layers - they can override the default
+  // configuration at a specified namespace. The namespace *must* start with spark.ssl.
+  val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
+  val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
+
+  logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
+  logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
+
+  val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
+    val trustStoreManagers =
+      for (trustStore <- fileServerSSLOptions.trustStore) yield {
+        val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()
+
+        try {
+          val ks = KeyStore.getInstance(KeyStore.getDefaultType)
+          ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)
+
+          val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
+          tmf.init(ks)
+          tmf.getTrustManagers
+        } finally {
+          input.close()
+        }
+      }
+
+    lazy val credulousTrustStoreManagers = Array({
+      logWarning("Using 'accept-all' trust manager for SSL connections.")
+      new X509TrustManager {
+        override def getAcceptedIssuers: Array[X509Certificate] = null
+
+        override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+
+        override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
+      }: TrustManager
+    })
+
+    val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))
+    sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)
+
+    val hostVerifier = new HostnameVerifier {
+      override def verify(s: String, sslSession: SSLSession): Boolean = true
+    }
+
+    (Some(sslContext.getSocketFactory), Some(hostVerifier))
+  } else {
+    (None, None)
+  }
+
   /**
    * Split a comma separated String, filter out any empty items, and return a Set of strings
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4d4c69d..13aa996 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -370,6 +370,7 @@ private[spark] object SparkConf {
     isAkkaConf(name) ||
     name.startsWith("spark.akka") ||
     name.startsWith("spark.auth") ||
+    name.startsWith("spark.ssl") ||
     isSparkPortConf(name)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index ea98051..1444c0d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
       uc = new URL(url).openConnection()
       uc.setConnectTimeout(httpReadTimeout)
     }
+    Utils.setupSecureURLConnection(uc, securityManager)
 
     val in = {
       uc.setReadTimeout(httpReadTimeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 65a1a8f..ae55b4f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(
 
   val user = System.getProperty("user.name", "<unknown>")
 
+  def copy(
+      name: String = name,
+      maxCores: Option[Int] = maxCores,
+      memoryPerSlave: Int = memoryPerSlave,
+      command: Command = command,
+      appUiUrl: String = appUiUrl,
+      eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
+    new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
+
   override def toString: String = "ApplicationDescription(" + name + ")"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7c1c831..38b3da0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -39,7 +39,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
   val timeout = AkkaUtils.askTimeout(conf)
 
   override def preStart() = {
-    masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
+    masterActor = context.actorSelection(
+      Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
 
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
@@ -161,7 +162,7 @@ object Client {
       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
 
     // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
-    Master.toAkkaUrl(driverArgs.master)
+    Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 
     actorSystem.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 58c95dc..b056a19 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -25,5 +25,13 @@ private[spark] class DriverDescription(
     val command: Command)
   extends Serializable {
 
+  def copy(
+      jarUrl: String = jarUrl,
+      mem: Int = mem,
+      cores: Int = cores,
+      supervise: Boolean = supervise,
+      command: Command = command): DriverDescription =
+    new DriverDescription(jarUrl, mem, cores, supervise, command)
+
   override def toString: String = s"DriverDescription (${command.mainClass})"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 39a7b03..ffe940f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -47,7 +47,7 @@ private[spark] class AppClient(
     conf: SparkConf)
   extends Logging {
 
-  val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+  val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
 
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
@@ -107,8 +107,9 @@ private[spark] class AppClient(
     def changeMaster(url: String) {
       // activeMasterUrl is a valid Spark url since we receive it from master.
       activeMasterUrl = url
-      master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
-      masterAddress = Master.toAkkaAddress(activeMasterUrl)
+      master = context.actorSelection(
+        Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(actorSystem)))
+      masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(actorSystem))
     }
 
     private def isPossibleMaster(remoteUrl: Address) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d92d993..5eeb9fe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -860,9 +860,9 @@ private[spark] object Master extends Logging {
    *
    * @throws SparkException if the url is invalid
    */
-  def toAkkaUrl(sparkUrl: String): String = {
+  def toAkkaUrl(sparkUrl: String, protocol: String): String = {
     val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
-    "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+    AkkaUtils.address(protocol, systemName, host, port, actorName)
   }
 
   /**
@@ -870,9 +870,9 @@ private[spark] object Master extends Logging {
    *
    * @throws SparkException if the url is invalid
    */
-  def toAkkaAddress(sparkUrl: String): Address = {
+  def toAkkaAddress(sparkUrl: String, protocol: String): Address = {
     val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
-    Address("akka.tcp", systemName, host, port)
+    Address(protocol, systemName, host, port)
   }
 
   def startSystemAndActor(

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index acbdf0d..bc9f78b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 
 import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 import org.apache.spark.util.logging.FileAppender
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1359983..b20f5c0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -31,8 +31,8 @@ import scala.util.Random
 import akka.actor._
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
@@ -93,7 +93,12 @@ private[spark] class Worker(
   var masterAddress: Address = null
   var activeMasterUrl: String = ""
   var activeMasterWebUiUrl : String = ""
-  val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
+  val akkaUrl = AkkaUtils.address(
+    AkkaUtils.protocol(context.system),
+    actorSystemName,
+    host,
+    port,
+    actorName)
   @volatile var registered = false
   @volatile var connected = false
   val workerId = generateWorkerId()
@@ -174,8 +179,9 @@ private[spark] class Worker(
     // activeMasterUrl it's a valid Spark url since we receive it from master.
     activeMasterUrl = url
     activeMasterWebUiUrl = uiUrl
-    master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
-    masterAddress = Master.toAkkaAddress(activeMasterUrl)
+    master = context.actorSelection(
+      Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system)))
+    masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system))
     connected = true
     // Cancel any outstanding re-registration attempts because we found a new master
     registrationRetryTimer.foreach(_.cancel())
@@ -347,10 +353,20 @@ private[spark] class Worker(
             }.toSeq
           }
           appDirectories(appId) = appLocalDirs
-
-          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
-            ExecutorState.LOADING)
+          val manager = new ExecutorRunner(
+            appId,
+            execId,
+            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
+            cores_,
+            memory_,
+            self,
+            workerId,
+            host,
+            sparkHome,
+            executorDir,
+            akkaUrl,
+            conf,
+            appLocalDirs, ExecutorState.LOADING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -406,7 +422,14 @@ private[spark] class Worker(
 
     case LaunchDriver(driverId, driverDesc) => {
       logInfo(s"Asked to launch driver $driverId")
-      val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
+      val driver = new DriverRunner(
+        conf,
+        driverId,
+        workDir,
+        sparkHome,
+        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
+        self,
+        akkaUrl)
       drivers(driverId) = driver
       driver.start()
 
@@ -523,10 +546,32 @@ private[spark] object Worker extends Logging {
     val securityMgr = new SecurityManager(conf)
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
       conf = conf, securityManager = securityMgr)
-    val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+    val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
     actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
       masterAkkaUrls, systemName, actorName,  workDir, conf, securityMgr), name = actorName)
     (actorSystem, boundPort)
   }
 
+  private[spark] def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
+    val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
+    val result = cmd.javaOpts.collectFirst {
+      case pattern(_result) => _result.toBoolean
+    }
+    result.getOrElse(false)
+  }
+
+  private[spark] def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = {
+    val prefix = "spark.ssl."
+    val useNLC = "spark.ssl.useNodeLocalConf"
+    if (isUseLocalNodeSSLConfig(cmd)) {
+      val newJavaOpts = cmd.javaOpts
+          .filter(opt => !opt.startsWith(s"-D$prefix")) ++
+          conf.getAll.collect { case (key, value) if key.startsWith(prefix) => s"-D$key=$value" } :+
+          s"-D$useNLC=true"
+      cmd.copy(javaOpts = newJavaOpts)
+    } else {
+      cmd
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8238253..bc72c89 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -123,7 +123,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       val executorConf = new SparkConf
       val port = executorConf.getInt("spark.executor.port", 0)
       val (fetcher, _) = AkkaUtils.createActorSystem(
-        "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
+        "driverPropsFetcher",
+        hostname,
+        port,
+        executorConf,
+        new SecurityManager(executorConf))
       val driver = fetcher.actorSelection(driverUrl)
       val timeout = AkkaUtils.askTimeout(executorConf)
       val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
@@ -132,7 +136,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       fetcher.shutdown()
 
       // Create SparkEnv using properties we fetched from the driver.
-      val driverConf = new SparkConf().setAll(props)
+      val driverConf = new SparkConf()
+      for ((key, value) <- props) {
+        // this is required for SSL in standalone mode
+        if (SparkConf.isExecutorStartupConf(key)) {
+          driverConf.setIfMissing(key, value)
+        } else {
+          driverConf.set(key, value)
+        }
+      }
       val env = SparkEnv.createExecutorEnv(
         driverConf, executorId, hostname, port, cores, isLocal = false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index ee10aa0..06786a5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{Path, FileSystem}
 import org.apache.spark.{Logging, SparkContext, SparkEnv}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.AkkaUtils
 
 private[spark] class SimrSchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -38,7 +39,8 @@ private[spark] class SimrSchedulerBackend(
   override def start() {
     super.start()
 
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+    val driverUrl = AkkaUtils.address(
+      AkkaUtils.protocol(actorSystem),
       SparkEnv.driverActorSystemName,
       sc.conf.get("spark.driver.host"),
       sc.conf.get("spark.driver.port"),

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb87a5..d2e1680 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 private[spark] class SparkDeploySchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -46,7 +46,8 @@ private[spark] class SparkDeploySchedulerBackend(
     super.start()
 
     // The endpoint for executors to talk to us
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+    val driverUrl = AkkaUtils.address(
+      AkkaUtils.protocol(actorSystem),
       SparkEnv.driverActorSystemName,
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 5289661..0d1c2a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
 import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, AkkaUtils}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -143,7 +143,8 @@ private[spark] class CoarseMesosSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+    val driverUrl = AkkaUtils.address(
+      AkkaUtils.protocol(sc.env.actorSystem),
       SparkEnv.driverActorSystemName,
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 4c9b1e3..3d9c619 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.Await
 import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Try
 
 import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
@@ -91,8 +92,11 @@ private[spark] object AkkaUtils extends Logging {
     val secureCookie = if (isAuthOn) secretKey else ""
     logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
 
-    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
-      ConfigFactory.parseString(
+    val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
+        .getOrElse(ConfigFactory.empty())
+
+    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String])
+      .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
       s"""
       |akka.daemonic = on
       |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -214,7 +218,7 @@ private[spark] object AkkaUtils extends Logging {
     val driverHost: String = conf.get("spark.driver.host", "localhost")
     val driverPort: Int = conf.getInt("spark.driver.port", 7077)
     Utils.checkHost(driverHost, "Expected hostname")
-    val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
+    val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
     val timeout = AkkaUtils.lookupTimeout(conf)
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
@@ -228,9 +232,33 @@ private[spark] object AkkaUtils extends Logging {
       actorSystem: ActorSystem): ActorRef = {
     val executorActorSystemName = SparkEnv.executorActorSystemName
     Utils.checkHost(host, "Expected hostname")
-    val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
+    val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
     val timeout = AkkaUtils.lookupTimeout(conf)
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
   }
+
+  def protocol(actorSystem: ActorSystem): String = {
+    val akkaConf = actorSystem.settings.config
+    val sslProp = "akka.remote.netty.tcp.enable-ssl"
+    protocol(akkaConf.hasPath(sslProp) && akkaConf.getBoolean(sslProp))
+  }
+
+  def protocol(ssl: Boolean = false): String = {
+    if (ssl) {
+      "akka.ssl.tcp"
+    } else {
+      "akka.tcp"
+    }
+  }
+
+  def address(
+      protocol: String,
+      systemName: String,
+      host: String,
+      port: Any,
+      actorName: String): String = {
+    s"$protocol://$systemName@$host:$port/user/$actorName"
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 31850b5..e9f2aed 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,8 +21,9 @@ import java.io._
 import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.{Locale, Properties, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
+import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import javax.net.ssl.HttpsURLConnection
 
 import scala.collection.JavaConversions._
 import scala.collection.Map
@@ -575,6 +576,7 @@ private[spark] object Utils extends Logging {
           logDebug("fetchFile not using security")
           uc = new URL(url).openConnection()
         }
+        Utils.setupSecureURLConnection(uc, securityMgr)
 
         val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
         uc.setConnectTimeout(timeout)
@@ -1820,6 +1822,20 @@ private[spark] object Utils extends Logging {
     PropertyConfigurator.configure(pro)
   }
 
+  /**
+   * If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and
+   * the host verifier from the given security manager.
+   */
+  def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = {
+    urlConnection match {
+      case https: HttpsURLConnection =>
+        sm.sslSocketFactory.foreach(https.setSSLSocketFactory)
+        sm.hostnameVerifier.foreach(https.setHostnameVerifier)
+        https
+      case connection => connection
+    }
+  }
+
   def invoke(
       clazz: Class[_],
       obj: AnyRef,

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/resources/keystore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/keystore b/core/src/test/resources/keystore
new file mode 100644
index 0000000..f8310e3
Binary files /dev/null and b/core/src/test/resources/keystore differ

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/resources/truststore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/truststore b/core/src/test/resources/truststore
new file mode 100644
index 0000000..a6b1d46
Binary files /dev/null and b/core/src/test/resources/truststore differ

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/resources/untrusted-keystore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/untrusted-keystore b/core/src/test/resources/untrusted-keystore
new file mode 100644
index 0000000..6015b02
Binary files /dev/null and b/core/src/test/resources/untrusted-keystore differ

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 0f49ce4..5fdf6bc 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -18,13 +18,19 @@
 package org.apache.spark
 
 import java.io._
+import java.net.URI
 import java.util.jar.{JarEntry, JarOutputStream}
+import javax.net.ssl.SSLHandshakeException
 
 import com.google.common.io.ByteStreams
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.commons.lang3.RandomUtils
 import org.scalatest.FunSuite
 
 import org.apache.spark.util.Utils
 
+import SSLSampleConfigs._
+
 class FileServerSuite extends FunSuite with LocalSparkContext {
 
   @transient var tmpDir: File = _
@@ -168,4 +174,88 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("HttpFileServer should work with SSL") {
+    val sparkConf = sparkSSLConfig()
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      fileTransferTest(server, sm)
+    } finally {
+      server.stop()
+    }
+  }
+
+  test ("HttpFileServer should work with SSL and good credentials") {
+    val sparkConf = sparkSSLConfig()
+    sparkConf.set("spark.authenticate", "true")
+    sparkConf.set("spark.authenticate.secret", "good")
+
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      fileTransferTest(server, sm)
+    } finally {
+      server.stop()
+    }
+  }
+
+  test ("HttpFileServer should not work with valid SSL and bad credentials") {
+    val sparkConf = sparkSSLConfig()
+    sparkConf.set("spark.authenticate", "true")
+    sparkConf.set("spark.authenticate.secret", "bad")
+
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      intercept[IOException] {
+        fileTransferTest(server)
+      }
+    } finally {
+      server.stop()
+    }
+  }
+
+  test ("HttpFileServer should not work with SSL when the server is untrusted") {
+    val sparkConf = sparkSSLConfigUntrusted()
+    val sm = new SecurityManager(sparkConf)
+    val server = new HttpFileServer(sparkConf, sm, 0)
+    try {
+      server.initialize()
+
+      intercept[SSLHandshakeException] {
+        fileTransferTest(server)
+      }
+    } finally {
+      server.stop()
+    }
+  }
+
+  def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
+    val randomContent = RandomUtils.nextBytes(100)
+    val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
+    FileUtils.writeByteArrayToFile(file, randomContent)
+    server.addFile(file)
+
+    val uri = new URI(server.serverUri + "/files/" + file.getName)
+
+    val connection = if (sm != null && sm.isAuthenticationEnabled()) {
+      Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
+    } else {
+      uri.toURL.openConnection()
+    }
+
+    if (sm != null) {
+      Utils.setupSecureURLConnection(connection, sm)
+    }
+
+    val buf = IOUtils.toByteArray(connection.getInputStream)
+    assert(buf === randomContent)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index d27880f..ccfe067 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -120,7 +120,7 @@ class MapOutputTrackerSuite extends FunSuite {
       securityManager = new SecurityManager(conf))
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
new file mode 100644
index 0000000..444a333
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.google.common.io.Files
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
+
+  test("test resolving property file as spark conf ") {
+    val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+    val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+    val conf = new SparkConf
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.protocol", "SSLv3")
+
+    val opts = SSLOptions.parse(conf, "spark.ssl")
+
+    assert(opts.enabled === true)
+    assert(opts.trustStore.isDefined === true)
+    assert(opts.trustStore.get.getName === "truststore")
+    assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.keyStore.isDefined === true)
+    assert(opts.keyStore.get.getName === "keystore")
+    assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.keyStorePassword === Some("password"))
+    assert(opts.keyPassword === Some("password"))
+    assert(opts.protocol === Some("SSLv3"))
+    assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+  }
+
+  test("test resolving property with defaults specified ") {
+    val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+    val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+    val conf = new SparkConf
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.protocol", "SSLv3")
+
+    val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
+    val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+
+    assert(opts.enabled === true)
+    assert(opts.trustStore.isDefined === true)
+    assert(opts.trustStore.get.getName === "truststore")
+    assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.keyStore.isDefined === true)
+    assert(opts.keyStore.get.getName === "keystore")
+    assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.keyStorePassword === Some("password"))
+    assert(opts.keyPassword === Some("password"))
+    assert(opts.protocol === Some("SSLv3"))
+    assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+  }
+
+  test("test whether defaults can be overridden ") {
+    val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+    val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+    val conf = new SparkConf
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ui.ssl.enabled", "false")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ui.ssl.keyStorePassword", "12345")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
+    conf.set("spark.ssl.protocol", "SSLv3")
+
+    val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
+    val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+
+    assert(opts.enabled === false)
+    assert(opts.trustStore.isDefined === true)
+    assert(opts.trustStore.get.getName === "truststore")
+    assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.keyStore.isDefined === true)
+    assert(opts.keyStore.get.getName === "keystore")
+    assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.keyStorePassword === Some("12345"))
+    assert(opts.keyPassword === Some("password"))
+    assert(opts.protocol === Some("SSLv3"))
+    assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
new file mode 100644
index 0000000..ace8123
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+object SSLSampleConfigs {
+  val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+  val untrustedKeyStorePath = new File(this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
+  val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+
+  def sparkSSLConfig() = {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, SSL_RSA_WITH_DES_CBC_SHA")
+    conf.set("spark.ssl.protocol", "TLSv1")
+    conf
+  }
+
+  def sparkSSLConfigUntrusted() = {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", untrustedKeyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, SSL_RSA_WITH_DES_CBC_SHA")
+    conf.set("spark.ssl.protocol", "TLSv1")
+    conf
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index fcca086..43fbd3f 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import scala.collection.mutable.ArrayBuffer
+import java.io.File
 
 import org.scalatest.FunSuite
 
@@ -125,6 +125,54 @@ class SecurityManagerSuite extends FunSuite {
 
   }
 
+  test("ssl on setup") {
+    val conf = SSLSampleConfigs.sparkSSLConfig()
+
+    val securityManager = new SecurityManager(conf)
+
+    assert(securityManager.fileServerSSLOptions.enabled === true)
+    assert(securityManager.akkaSSLOptions.enabled === true)
+
+    assert(securityManager.sslSocketFactory.isDefined === true)
+    assert(securityManager.hostnameVerifier.isDefined === true)
+
+    assert(securityManager.fileServerSSLOptions.trustStore.isDefined === true)
+    assert(securityManager.fileServerSSLOptions.trustStore.get.getName === "truststore")
+    assert(securityManager.fileServerSSLOptions.keyStore.isDefined === true)
+    assert(securityManager.fileServerSSLOptions.keyStore.get.getName === "keystore")
+    assert(securityManager.fileServerSSLOptions.trustStorePassword === Some("password"))
+    assert(securityManager.fileServerSSLOptions.keyStorePassword === Some("password"))
+    assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
+    assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1"))
+    assert(securityManager.fileServerSSLOptions.enabledAlgorithms ===
+        Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
+
+    assert(securityManager.akkaSSLOptions.trustStore.isDefined === true)
+    assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore")
+    assert(securityManager.akkaSSLOptions.keyStore.isDefined === true)
+    assert(securityManager.akkaSSLOptions.keyStore.get.getName === "keystore")
+    assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password"))
+    assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password"))
+    assert(securityManager.akkaSSLOptions.keyPassword === Some("password"))
+    assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1"))
+    assert(securityManager.akkaSSLOptions.enabledAlgorithms ===
+        Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
+  }
+
+  test("ssl off setup") {
+    val file = File.createTempFile("SSLOptionsSuite", "conf")
+    file.deleteOnExit()
+
+    System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
+    val conf = new SparkConf()
+
+    val securityManager = new SecurityManager(conf)
+
+    assert(securityManager.fileServerSSLOptions.enabled === false)
+    assert(securityManager.akkaSSLOptions.enabled === false)
+    assert(securityManager.sslSocketFactory.isDefined === false)
+    assert(securityManager.hostnameVerifier.isDefined === false)
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
index d2dae34..518073d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.Matchers
 class ClientSuite extends FunSuite with Matchers {
   test("correctly validates driver jar URL's") {
     ClientArguments.isValidJarUrl("http://someHost:8080/foo.jar") should be (true)
+    ClientArguments.isValidJarUrl("https://someHost:8080/foo.jar") should be (true)
 
     // file scheme with authority and path is valid.
     ClientArguments.isValidJarUrl("file://somehost/path/to/a/jarFile.jar") should be (true)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 3d2335f..34c74d8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -20,30 +20,46 @@ package org.apache.spark.deploy.master
 import akka.actor.Address
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SSLOptions, SparkConf, SparkException}
 
 class MasterSuite extends FunSuite {
 
   test("toAkkaUrl") {
-    val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234")
+    val conf = new SparkConf(loadDefaults = false)
+    val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.tcp")
     assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
   }
 
+  test("toAkkaUrl with SSL") {
+    val conf = new SparkConf(loadDefaults = false)
+    val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.ssl.tcp")
+    assert("akka.ssl.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
+  }
+
   test("toAkkaUrl: a typo url") {
+    val conf = new SparkConf(loadDefaults = false)
     val e = intercept[SparkException] {
-      Master.toAkkaUrl("spark://1.2. 3.4:1234")
+      Master.toAkkaUrl("spark://1.2. 3.4:1234", "akka.tcp")
     }
     assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
   }
 
   test("toAkkaAddress") {
-    val address = Master.toAkkaAddress("spark://1.2.3.4:1234")
+    val conf = new SparkConf(loadDefaults = false)
+    val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.tcp")
     assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
   }
 
+  test("toAkkaAddress with SSL") {
+    val conf = new SparkConf(loadDefaults = false)
+    val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.ssl.tcp")
+    assert(Address("akka.ssl.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
+  }
+
   test("toAkkaAddress: a typo url") {
+    val conf = new SparkConf(loadDefaults = false)
     val e = intercept[SparkException] {
-      Master.toAkkaAddress("spark://1.2. 3.4:1234")
+      Master.toAkkaAddress("spark://1.2. 3.4:1234", "akka.tcp")
     }
     assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
new file mode 100644
index 0000000..84e2fd7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.Command
+
+import org.scalatest.{Matchers, FunSuite}
+
+class WorkerSuite extends FunSuite with Matchers {
+
+  def cmd(javaOpts: String*) = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
+  def conf(opts: (String, String)*) = new SparkConf(loadDefaults = false).setAll(opts)
+
+  test("test isUseLocalNodeSSLConfig") {
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) shouldBe true
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=false")) shouldBe false
+    Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=")) shouldBe false
+  }
+
+  test("test maybeUpdateSSLSettings") {
+    Worker.maybeUpdateSSLSettings(
+      cmd("-Dasdf=dfgh", "-Dspark.ssl.opt1=x"),
+      conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+        .javaOpts should contain theSameElementsInOrderAs Seq(
+          "-Dasdf=dfgh", "-Dspark.ssl.opt1=x")
+
+    Worker.maybeUpdateSSLSettings(
+      cmd("-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x"),
+      conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+        .javaOpts should contain theSameElementsInOrderAs Seq(
+          "-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x")
+
+    Worker.maybeUpdateSSLSettings(
+      cmd("-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=x"),
+      conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z"))
+        .javaOpts should contain theSameElementsAs Seq(
+          "-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=y", "-Dspark.ssl.opt2=z")
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfea3003/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 6bbf72e..39e5d36 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util
 
+import java.util.concurrent.TimeoutException
+
 import scala.concurrent.Await
 
 import akka.actor._
@@ -26,6 +28,7 @@ import org.scalatest.FunSuite
 import org.apache.spark._
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.SSLSampleConfigs._
 
 
 /**
@@ -47,7 +50,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val badconf = new SparkConf
     badconf.set("spark.authenticate", "true")
@@ -60,7 +63,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
       conf = conf, securityManager = securityManagerBad)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     intercept[akka.actor.ActorNotFound] {
       slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -74,7 +77,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     val conf = new SparkConf
     conf.set("spark.authenticate", "false")
     conf.set("spark.authenticate.secret", "bad")
-    val securityManager = new SecurityManager(conf);
+    val securityManager = new SecurityManager(conf)
 
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -85,18 +88,18 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val badconf = new SparkConf
     badconf.set("spark.authenticate", "false")
     badconf.set("spark.authenticate.secret", "good")
-    val securityManagerBad = new SecurityManager(badconf);
+    val securityManagerBad = new SecurityManager(badconf)
 
     val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
       conf = badconf, securityManager = securityManagerBad)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
 
@@ -124,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     val conf = new SparkConf
     conf.set("spark.authenticate", "true")
     conf.set("spark.authenticate.secret", "good")
-    val securityManager = new SecurityManager(conf);
+    val securityManager = new SecurityManager(conf)
 
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -135,12 +138,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val goodconf = new SparkConf
     goodconf.set("spark.authenticate", "true")
     goodconf.set("spark.authenticate.secret", "good")
-    val securityManagerGood = new SecurityManager(goodconf);
+    val securityManagerGood = new SecurityManager(goodconf)
 
     assert(securityManagerGood.isAuthenticationEnabled() === true)
 
@@ -148,7 +151,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
       conf = goodconf, securityManager = securityManagerGood)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
 
@@ -175,7 +178,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     conf.set("spark.authenticate", "true")
     conf.set("spark.authenticate.secret", "good")
 
-    val securityManager = new SecurityManager(conf);
+    val securityManager = new SecurityManager(conf)
 
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
@@ -186,12 +189,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
 
     val badconf = new SparkConf
     badconf.set("spark.authenticate", "false")
     badconf.set("spark.authenticate.secret", "bad")
-    val securityManagerBad = new SecurityManager(badconf);
+    val securityManagerBad = new SecurityManager(badconf)
 
     assert(securityManagerBad.isAuthenticationEnabled() === false)
 
@@ -199,7 +202,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
       conf = badconf, securityManager = securityManagerBad)
     val slaveTracker = new MapOutputTrackerWorker(conf)
     val selection = slaveSystem.actorSelection(
-      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
     val timeout = AkkaUtils.lookupTimeout(conf)
     intercept[akka.actor.ActorNotFound] {
       slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
@@ -209,4 +212,170 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     slaveSystem.shutdown()
   }
 
+  test("remote fetch ssl on") {
+    val conf = sparkSSLConfig()
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === false)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+    assert(securityManagerBad.isAuthenticationEnabled() === false)
+
+    masterTracker.registerShuffle(10, 1)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    masterTracker.registerMapOutput(10, 0,
+      MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    // this should succeed since security off
+    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+      Seq((BlockManagerId("a", "hostA", 1000), size1000)))
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+
+  test("remote fetch ssl on and security enabled") {
+    val conf = sparkSSLConfig()
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === true)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    slaveConf.set("spark.authenticate", "true")
+    slaveConf.set("spark.authenticate.secret", "good")
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+    assert(securityManagerBad.isAuthenticationEnabled() === true)
+
+    masterTracker.registerShuffle(10, 1)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    masterTracker.registerMapOutput(10, 0,
+      MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+      Seq((BlockManagerId("a", "hostA", 1000), size1000)))
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+
+  test("remote fetch ssl on and security enabled - bad credentials") {
+    val conf = sparkSSLConfig()
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === true)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    slaveConf.set("spark.authenticate", "true")
+    slaveConf.set("spark.authenticate.secret", "bad")
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    intercept[akka.actor.ActorNotFound] {
+      slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+    }
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+
+  test("remote fetch ssl on - untrusted server") {
+    val conf = sparkSSLConfigUntrusted()
+    val securityManager = new SecurityManager(conf)
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === false)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+      Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
+
+    val slaveConf = sparkSSLConfig()
+    val securityManagerBad = new SecurityManager(slaveConf)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = slaveConf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTrackerWorker(conf)
+    val selection = slaveSystem.actorSelection(
+      AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker"))
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    intercept[TimeoutException] {
+      slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+    }
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org