You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2023/10/14 07:36:35 UTC

[spark] branch master updated: [SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTransportConf

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 26aaf1ca04c [SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTransportConf
26aaf1ca04c is described below

commit 26aaf1ca04cd5347ce65493fa0e6c802728a1806
Author: Hasnain Lakhani <ha...@databricks.com>
AuthorDate: Sat Oct 14 02:36:23 2023 -0500

    [SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTransportConf
    
    ### What changes were proposed in this pull request?
    
    This PR adds the options added in https://github.com/apache/spark/pull/43220  to `SSLOptions` and `SparkTransportConf`.
    
    By adding it to the `SSLOptions` we can support inheritance of options, so settings for the UI and RPC SSL settings can be shared as much as possible. The `SparkTransportConf` changes are needed to support propagating these settings.
    
    I also make some changes to `SecurityManager` to log when this feature is enabled, and make the existing `spark.network.crypto` options mutually exclusive with this new settings (it would just involve double encryption then).
    
    Lastly, make these flags propagate down to when executors are launched, and allow the passwords to be sent via environment variables (similar to how it's done for an existing secret). This ensures they are not visible in plaintext, but also ensures they are available at executor startup (otherwise it can never talk to the driver/worker)
    
    ### Why are the changes needed?
    
    The propagation of these options are needed for the RPC functionality to work
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    CI
    
    Added some unit tests which I verified passed:
    
    ```
    build/sbt
    > project core
    > testOnly org.apache.spark.SparkConfSuite org.apache.spark.SSLOptionsSuite org.apache.spark.SecurityManagerSuite org.apache.spark.deploy.worker.CommandUtilsSuite
    ```
    
    The rest of the changes and integration were tested as part of https://github.com/apache/spark/pull/42685
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43238 from hasnain-db/spark-tls-ssloptions.
    
    Authored-by: Hasnain Lakhani <ha...@databricks.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../main/scala/org/apache/spark/SSLOptions.scala   | 108 +++++++++++++++++-
 .../scala/org/apache/spark/SecurityManager.scala   |  45 +++++++-
 .../main/scala/org/apache/spark/SparkConf.scala    |   4 +-
 .../src/main/scala/org/apache/spark/SparkEnv.scala |   2 +-
 .../apache/spark/deploy/worker/CommandUtils.scala  |  13 ++-
 .../spark/network/netty/SparkTransportConf.scala   |  22 ++--
 .../scala/org/apache/spark/SSLOptionsSuite.scala   | 127 +++++++++++++++++++++
 .../scala/org/apache/spark/SparkConfSuite.scala    |   6 +-
 .../spark/deploy/worker/CommandUtilsSuite.scala    |  28 ++++-
 project/MimaExcludes.scala                         |   4 +-
 10 files changed, 336 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index d159f5717b0..51b6b4445ea 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -19,12 +19,16 @@ package org.apache.spark
 
 import java.io.File
 import java.security.NoSuchAlgorithmException
+import java.util.HashMap
+import java.util.Map
 import javax.net.ssl.SSLContext
 
 import org.apache.hadoop.conf.Configuration
 import org.eclipse.jetty.util.ssl.SslContextFactory
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.ConfigProvider
+import org.apache.spark.network.util.MapConfigProvider
 
 /**
  * SSLOptions class is a common container for SSL configuration options. It offers methods to
@@ -33,32 +37,47 @@ import org.apache.spark.internal.Logging
  * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
  * by the protocol, which it can generate the configuration for.
  *
+ * @param namespace           the configuration namespace
  * @param enabled             enables or disables SSL; if it is set to false, the rest of the
  *                            settings are disregarded
  * @param port                the port where to bind the SSL server; if not defined, it will be
  *                            based on the non-SSL port for the same service.
  * @param keyStore            a path to the key-store file
  * @param keyStorePassword    a password to access the key-store file
+ * @param privateKey          a PKCS#8 private key file in PEM format
  * @param keyPassword         a password to access the private key in the key-store
  * @param keyStoreType        the type of the key-store
  * @param needClientAuth      set true if SSL needs client authentication
+ * @param certChain           an X.509 certificate chain file in PEM format
  * @param trustStore          a path to the trust-store file
  * @param trustStorePassword  a password to access the trust-store file
  * @param trustStoreType      the type of the trust-store
+ * @param trustStoreReloadingEnabled enables or disables using a trust-store that reloads
+ *                                   its configuration when the trust-store file on disk changes
+ * @param trustStoreReloadIntervalMs the interval, in milliseconds,
+ *                                 when the trust-store will reload its configuration
+ * @param openSslEnabled      enables or disables using an OpenSSL implementation (if available),
+ *                            requires certChain and keyFile arguments
  * @param protocol            SSL protocol (remember that SSLv3 was compromised) supported by Java
  * @param enabledAlgorithms   a set of encryption algorithms that may be used
  */
 private[spark] case class SSLOptions(
+    namespace: Option[String] = None,
     enabled: Boolean = false,
     port: Option[Int] = None,
     keyStore: Option[File] = None,
     keyStorePassword: Option[String] = None,
+    privateKey: Option[File] = None,
     keyPassword: Option[String] = None,
     keyStoreType: Option[String] = None,
     needClientAuth: Boolean = false,
+    certChain: Option[File] = None,
     trustStore: Option[File] = None,
     trustStorePassword: Option[String] = None,
     trustStoreType: Option[String] = None,
+    trustStoreReloadingEnabled: Boolean = false,
+    trustStoreReloadIntervalMs: Int = 10000,
+    openSslEnabled: Boolean = false,
     protocol: Option[String] = None,
     enabledAlgorithms: Set[String] = Set.empty)
     extends Logging {
@@ -134,12 +153,37 @@ private[spark] case class SSLOptions(
     supported
   }
 
+  def createConfigProvider(conf: SparkConf): ConfigProvider = {
+    val nsp = namespace.getOrElse("spark.ssl")
+    val confMap: Map[String, String] = new HashMap[String, String]
+    conf.getAll.foreach(tuple => confMap.put(tuple._1, tuple._2))
+    confMap.put(s"$nsp.enabled", enabled.toString)
+    confMap.put(s"$nsp.trustStoreReloadingEnabled", trustStoreReloadingEnabled.toString)
+    confMap.put(s"$nsp.openSslEnabled", openSslEnabled.toString)
+    confMap.put(s"$nsp.trustStoreReloadIntervalMs", trustStoreReloadIntervalMs.toString)
+    keyStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.keyStore", _))
+    keyStorePassword.foreach(confMap.put(s"$nsp.keyStorePassword", _))
+    privateKey.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.privateKey", _))
+    keyPassword.foreach(confMap.put(s"$nsp.keyPassword", _))
+    certChain.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.certChain", _))
+    trustStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.trustStore", _))
+    trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _))
+    protocol.foreach(confMap.put(s"$nsp.protocol", _))
+    confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(","))
+
+    new MapConfigProvider(confMap)
+  }
+
   /** Returns a string representation of this SSLOptions with all the passwords masked. */
   override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " +
       s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
-      s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+      s"privateKey=$privateKey, keyPassword=${keyPassword.map(_ => "xxx")}, " +
+      s"keyStoreType=$keyStoreType, needClientAuth=$needClientAuth, " +
+      s"certChain=$certChain, trustStore=$trustStore, " +
+      s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+      s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " +
+      s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " +
       s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
-
 }
 
 private[spark] object SSLOptions extends Logging {
@@ -152,13 +196,21 @@ private[spark] object SSLOptions extends Logging {
    * $ - `[ns].port` - the port where to bind the SSL server
    * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
    * $ - `[ns].keyStorePassword` - a password to the key-store file
+   * $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format
    * $ - `[ns].keyPassword` - a password to the private key
    * $ - `[ns].keyStoreType` - the type of the key-store
    * $ - `[ns].needClientAuth` - whether SSL needs client authentication
+   * $ - `[ns].certChain` - an X.509 certificate chain file in PEM format
    * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
    *                         directory
    * $ - `[ns].trustStorePassword` - a password to the trust-store file
    * $ - `[ns].trustStoreType` - the type of trust-store
+   * $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store
+   * that reloads its configuration when the trust-store file on disk changes
+   * $ - `[ns].trustStoreReloadIntervalMs` - the interval, in milliseconds, the
+   * trust-store will reload its configuration
+   * $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation
+   * (if available on host system), requires certChain and keyFile arguments
    * $ - `[ns].protocol` - a protocol name supported by a particular Java version
    * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
    *
@@ -180,7 +232,15 @@ private[spark] object SSLOptions extends Logging {
       hadoopConf: Configuration,
       ns: String,
       defaults: Option[SSLOptions] = None): SSLOptions = {
-    val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
+
+    // RPC does not inherit the default enabled setting due to backwards compatibility reasons
+    val enabledDefault = if (ns == "spark.ssl.rpc") {
+      false
+    } else {
+      defaults.exists(_.enabled)
+    }
+
+    val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = enabledDefault)
     if (!enabled) {
       return new SSLOptions()
     }
@@ -194,15 +254,23 @@ private[spark] object SSLOptions extends Logging {
 
     val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword")
         .orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_)))
+        .orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_STORE_PASSWORD)).filter(_.trim.nonEmpty))
         .orElse(defaults.flatMap(_.keyStorePassword))
 
+    val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_))
+        .orElse(defaults.flatMap(_.privateKey))
+
     val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword")
         .orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_)))
+        .orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_PASSWORD)).filter(_.trim.nonEmpty))
         .orElse(defaults.flatMap(_.keyPassword))
 
     val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType")
         .orElse(defaults.flatMap(_.keyStoreType))
 
+    val certChain = conf.getOption(s"$ns.certChain").map(new File(_))
+        .orElse(defaults.flatMap(_.certChain))
+
     val needClientAuth =
       conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))
 
@@ -211,11 +279,21 @@ private[spark] object SSLOptions extends Logging {
 
     val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword")
         .orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_)))
+        .orElse(Option(conf.getenv(ENV_RPC_SSL_TRUST_STORE_PASSWORD)).filter(_.trim.nonEmpty))
         .orElse(defaults.flatMap(_.trustStorePassword))
 
     val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType")
         .orElse(defaults.flatMap(_.trustStoreType))
 
+    val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled",
+        defaultValue = defaults.exists(_.trustStoreReloadingEnabled))
+
+    val trustStoreReloadIntervalMs = conf.getInt(s"$ns.trustStoreReloadIntervalMs",
+      defaultValue = defaults.map(_.trustStoreReloadIntervalMs).getOrElse(10000))
+
+    val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled",
+        defaultValue = defaults.exists(_.openSslEnabled))
+
     val protocol = conf.getWithSubstitution(s"$ns.protocol")
         .orElse(defaults.flatMap(_.protocol))
 
@@ -225,19 +303,43 @@ private[spark] object SSLOptions extends Logging {
         .getOrElse(Set.empty)
 
     new SSLOptions(
+      Some(ns),
       enabled,
       port,
       keyStore,
       keyStorePassword,
+      privateKey,
       keyPassword,
       keyStoreType,
       needClientAuth,
+      certChain,
       trustStore,
       trustStorePassword,
       trustStoreType,
+      trustStoreReloadingEnabled,
+      trustStoreReloadIntervalMs,
+      openSslEnabled,
       protocol,
       enabledAlgorithms)
   }
 
+  // Config names and environment variables for propagating SSL passwords
+  val SPARK_RPC_SSL_KEY_PASSWORD_CONF = "spark.ssl.rpc.keyPassword"
+  val SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF = "spark.ssl.rpc.keyStorePassword"
+  val SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF = "spark.ssl.rpc.trustStorePassword"
+  val SPARK_RPC_SSL_PASSWORD_FIELDS: Seq[String] = Seq(
+    SPARK_RPC_SSL_KEY_PASSWORD_CONF,
+    SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF,
+    SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF
+  )
+
+  val ENV_RPC_SSL_KEY_PASSWORD = "_SPARK_SSL_RPC_KEY_PASSWORD"
+  val ENV_RPC_SSL_KEY_STORE_PASSWORD = "_SPARK_SSL_RPC_KEY_STORE_PASSWORD"
+  val ENV_RPC_SSL_TRUST_STORE_PASSWORD = "_SPARK_SSL_RPC_TRUST_STORE_PASSWORD"
+  val SPARK_RPC_SSL_PASSWORD_ENVS: Seq[String] = Seq(
+    ENV_RPC_SSL_KEY_PASSWORD,
+    ENV_RPC_SSL_KEY_STORE_PASSWORD,
+    ENV_RPC_SSL_TRUST_STORE_PASSWORD
+  )
 }
 
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 7e72ae8d89e..821577f089a 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -85,6 +85,10 @@ private[spark] class SecurityManager(
   setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))
 
   private var secretKey: String = _
+
+  private val sslRpcEnabled = sparkConf.getBoolean(
+    "spark.ssl.rpc.enabled", false)
+
   logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
     "; ui acls " + (if (aclsOn) "enabled" else "disabled") +
     "; users with view permissions: " +
@@ -94,12 +98,15 @@ private[spark] class SecurityManager(
     "; users with modify permissions: " +
     (if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
     "; groups with modify permissions: " +
-    (if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY"))
+    (if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") +
+    "; RPC SSL " + (if (sslRpcEnabled) "enabled" else "disabled"))
 
   private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
   // the default SSL configuration - it will be used by all communication layers unless overwritten
   private val defaultSSLOptions =
     SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None)
+  // the SSL configuration for RPCs
+  private val rpcSSLOptions = getSSLOptions("rpc")
 
   def getSSLOptions(module: String): SSLOptions = {
     val opts =
@@ -269,9 +276,22 @@ private[spark] class SecurityManager(
    * @return Whether to enable encryption when connecting to services that support it.
    */
   def isEncryptionEnabled(): Boolean = {
-    sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
+    val encryptionEnabled = sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) ||
+      sparkConf.get(SASL_ENCRYPTION_ENABLED)
+    if (encryptionEnabled && sslRpcEnabled) {
+      logWarning("Network encryption disabled as RPC SSL encryption is enabled")
+      false
+    } else {
+      encryptionEnabled
+    }
   }
 
+  /**
+   * Checks whether RPC SSL is enabled or not
+   * @return Whether RPC SSL is enabled or not
+   */
+  def isSslRpcEnabled(): Boolean = sslRpcEnabled
+
   /**
    * Gets the user used for authenticating SASL connections.
    * For now use a single hardcoded user.
@@ -391,6 +411,27 @@ private[spark] class SecurityManager(
   // Default SecurityManager only has a single secret key, so ignore appId.
   override def getSaslUser(appId: String): String = getSaslUser()
   override def getSecretKey(appId: String): String = getSecretKey()
+
+  /**
+   * If the RPC SSL settings are enabled, returns a map containing the password
+   * values so they can be passed to executors or other subprocesses.
+   *
+   * @return Map containing environment variables to pass
+   */
+  def getEnvironmentForSslRpcPasswords: Map[String, String] = {
+    if (rpcSSLOptions.enabled) {
+      val map = scala.collection.mutable.Map[String, String]()
+      rpcSSLOptions.keyPassword.foreach(password =>
+        map += (SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> password))
+      rpcSSLOptions.keyStorePassword.foreach(password =>
+        map += (SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> password))
+      rpcSSLOptions.trustStorePassword.foreach(password =>
+        map += (SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> password))
+      map.toMap
+    } else {
+      Map()
+    }
+  }
 }
 
 private[spark] object SecurityManager {
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 091413bb9cc..b688604beea 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -739,6 +739,9 @@ private[spark] object SparkConf extends Logging {
     (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
     name.startsWith("spark.rpc") ||
     name.startsWith("spark.network") ||
+    // We need SSL configs to propagate as they may be needed for RPCs.
+    // Passwords are propagated separately though.
+    (name.startsWith("spark.ssl") && !name.contains("Password")) ||
     isSparkPortConf(name)
   }
 
@@ -804,5 +807,4 @@ private[spark] object SparkConf extends Logging {
       key: String,
       version: String,
       translation: String => String = null)
-
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index a4f05c76189..310dc828440 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -308,7 +308,7 @@ object SparkEnv extends Logging {
     }
 
     ioEncryptionKey.foreach { _ =>
-      if (!securityManager.isEncryptionEnabled()) {
+      if (!(securityManager.isEncryptionEnabled() || securityManager.isSslRpcEnabled())) {
         logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
           "wire.")
       }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 9a4a037e35c..d689eb02cc5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -22,7 +22,7 @@ import java.io.{File, FileOutputStream, InputStream, IOException}
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.SecurityManager
+import org.apache.spark.{SecurityManager, SSLOptions}
 import org.apache.spark.deploy.Command
 import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.WorkerCommandBuilder
@@ -90,6 +90,8 @@ object CommandUtils extends Logging {
     if (securityMgr.isAuthenticationEnabled) {
       newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey)
     }
+    // set SSL env variables if needed
+    newEnvironment ++= securityMgr.getEnvironmentForSslRpcPasswords
 
     Command(
       command.mainClass,
@@ -97,8 +99,13 @@ object CommandUtils extends Logging {
       newEnvironment,
       command.classPathEntries ++ classPath,
       Seq.empty, // library path already captured in environment variable
-      // filter out auth secret from java options
-      command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
+      // filter out secrets from java options
+      command.javaOpts.filterNot(opts =>
+        opts.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF) ||
+        SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.exists(
+          field => opts.startsWith("-D" + field)
+        )
+      ))
   }
 
   /** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index 812d57ac67c..fc9e5201cfb 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -19,7 +19,7 @@ package org.apache.spark.network.netty
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SSLOptions}
 import org.apache.spark.network.util.{ConfigProvider, NettyUtils, TransportConf}
 
 /**
@@ -38,12 +38,14 @@ object SparkTransportConf {
    *                       This restriction will only occur if these properties are not already set.
    * @param role           optional role, could be driver, executor, worker and master. Default is
    *                      [[None]], means no role specific configurations.
+   * @param sslOptions SSL config options
    */
   def fromSparkConf(
       _conf: SparkConf,
       module: String,
       numUsableCores: Int = 0,
-      role: Option[String] = None): TransportConf = {
+      role: Option[String] = None,
+      sslOptions: Option[SSLOptions] = None): TransportConf = {
     val conf = _conf.clone
     // specify default thread configuration based on our JVM's allocation of cores (rather than
     // necessarily assuming we have all the machine's cores).
@@ -57,12 +59,14 @@ object SparkTransportConf {
       conf.set(s"spark.$module.io.$suffix", value)
     }
 
-    new TransportConf(module, new ConfigProvider {
-      override def get(name: String): String = conf.get(name)
-      override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
-      override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
-        conf.getAll.toMap.asJava.entrySet()
-      }
-    })
+    val configProvider = sslOptions.map(_.createConfigProvider(conf)).getOrElse(
+      new ConfigProvider {
+        override def get(name: String): String = conf.get(name)
+        override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
+        override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
+          conf.getAll.toMap.asJava.entrySet()
+        }
+      })
+    new TransportConf(module, configProvider)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
index 81bc4ae9da0..ee6bf071ef6 100644
--- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -31,6 +31,8 @@ class SSLOptionsSuite extends SparkFunSuite {
   test("test resolving property file as spark conf ") {
     val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
     val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+    val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+    val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
 
     // Pick two cipher suites that the provider knows about
     val sslContext = SSLContext.getInstance("TLSv1.2")
@@ -47,8 +49,13 @@ class SSLOptionsSuite extends SparkFunSuite {
     conf.set("spark.ssl.keyStore", keyStorePath)
     conf.set("spark.ssl.keyStorePassword", "password")
     conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.privateKey", privateKeyPath)
+    conf.set("spark.ssl.certChain", certChainPath)
     conf.set("spark.ssl.trustStore", trustStorePath)
     conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.trustStoreReloadingEnabled", "false")
+    conf.set("spark.ssl.trustStoreReloadIntervalMs", "10000")
+    conf.set("spark.ssl.openSslEnabled", "false")
     conf.set("spark.ssl.enabledAlgorithms", algorithms.mkString(","))
     conf.set("spark.ssl.protocol", "TLSv1.2")
 
@@ -62,7 +69,16 @@ class SSLOptionsSuite extends SparkFunSuite {
     assert(opts.keyStore.get.getName === "keystore")
     assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
     assert(opts.trustStorePassword === Some("password"))
+    assert(opts.trustStoreReloadingEnabled === false)
+    assert(opts.trustStoreReloadIntervalMs === 10000)
+    assert(opts.privateKey.isDefined === true)
+    assert(opts.privateKey.get.getName === "key.pem")
+    assert(opts.privateKey.get.getAbsolutePath === privateKeyPath)
+    assert(opts.certChain.isDefined === true)
+    assert(opts.certChain.get.getName === "certchain.pem")
+    assert(opts.certChain.get.getAbsolutePath === certChainPath)
     assert(opts.keyStorePassword === Some("password"))
+    assert(opts.openSslEnabled === false)
     assert(opts.keyPassword === Some("password"))
     assert(opts.protocol === Some("TLSv1.2"))
     assert(opts.enabledAlgorithms === algorithms)
@@ -71,6 +87,8 @@ class SSLOptionsSuite extends SparkFunSuite {
   test("test resolving property with defaults specified ") {
     val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
     val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+    val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+    val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
 
     val conf = new SparkConf
     val hadoopConf = new Configuration()
@@ -78,8 +96,13 @@ class SSLOptionsSuite extends SparkFunSuite {
     conf.set("spark.ssl.keyStore", keyStorePath)
     conf.set("spark.ssl.keyStorePassword", "password")
     conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.privateKey", privateKeyPath)
+    conf.set("spark.ssl.certChain", certChainPath)
     conf.set("spark.ssl.trustStore", trustStorePath)
     conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.trustStoreReloadingEnabled", "false")
+    conf.set("spark.ssl.trustStoreReloadIntervalMs", "10000")
+    conf.set("spark.ssl.openSslEnabled", "false")
     conf.set("spark.ssl.enabledAlgorithms",
       "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
     conf.set("spark.ssl.protocol", "SSLv3")
@@ -91,12 +114,22 @@ class SSLOptionsSuite extends SparkFunSuite {
     assert(opts.trustStore.isDefined)
     assert(opts.trustStore.get.getName === "truststore")
     assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.privateKey.isDefined === true)
+    assert(opts.privateKey.get.getName === "key.pem")
+    assert(opts.privateKey.get.getAbsolutePath === privateKeyPath)
+    assert(opts.certChain.isDefined === true)
+    assert(opts.certChain.get.getName === "certchain.pem")
+    assert(opts.certChain.get.getAbsolutePath === certChainPath)
     assert(opts.keyStore.isDefined)
     assert(opts.keyStore.get.getName === "keystore")
     assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
     assert(opts.trustStorePassword === Some("password"))
     assert(opts.keyStorePassword === Some("password"))
     assert(opts.keyPassword === Some("password"))
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.trustStoreReloadingEnabled === false)
+    assert(opts.trustStoreReloadIntervalMs === 10000)
+    assert(opts.openSslEnabled === false)
     assert(opts.protocol === Some("SSLv3"))
     assert(opts.enabledAlgorithms ===
       Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
@@ -105,6 +138,8 @@ class SSLOptionsSuite extends SparkFunSuite {
   test("test whether defaults can be overridden ") {
     val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
     val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+    val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+    val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
 
     val conf = new SparkConf
     val hadoopConf = new Configuration()
@@ -115,8 +150,13 @@ class SSLOptionsSuite extends SparkFunSuite {
     conf.set("spark.ssl.keyStorePassword", "password")
     conf.set("spark.ssl.ui.keyStorePassword", "12345")
     conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.privateKey", privateKeyPath)
+    conf.set("spark.ssl.certChain", certChainPath)
     conf.set("spark.ssl.trustStore", trustStorePath)
     conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.ui.trustStoreReloadingEnabled", "true")
+    conf.set("spark.ssl.ui.trustStoreReloadIntervalMs", "20000")
+    conf.set("spark.ssl.ui.openSslEnabled", "true")
     conf.set("spark.ssl.enabledAlgorithms",
       "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
     conf.set("spark.ssl.ui.enabledAlgorithms", "ABC, DEF")
@@ -130,16 +170,88 @@ class SSLOptionsSuite extends SparkFunSuite {
     assert(opts.trustStore.isDefined)
     assert(opts.trustStore.get.getName === "truststore")
     assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.privateKey.isDefined === true)
+    assert(opts.privateKey.get.getName === "key.pem")
+    assert(opts.privateKey.get.getAbsolutePath === privateKeyPath)
+    assert(opts.certChain.isDefined === true)
+    assert(opts.certChain.get.getName === "certchain.pem")
+    assert(opts.certChain.get.getAbsolutePath === certChainPath)
+    assert(opts.keyStore.isDefined)
+    assert(opts.keyStore.get.getName === "keystore")
+    assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
+    assert(opts.trustStorePassword === Some("password"))
+    assert(opts.keyStorePassword === Some("12345"))
+    assert(opts.keyPassword === Some("password"))
+    assert(opts.trustStoreReloadingEnabled === true)
+    assert(opts.trustStoreReloadIntervalMs === 20000)
+    assert(opts.openSslEnabled === true)
+    assert(opts.protocol === Some("SSLv3"))
+    assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
+  }
+
+  test("ensure RPC settings don't get enabled via inheritance ") {
+    val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+    val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+    val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+    val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
+
+    val conf = new SparkConf
+    val hadoopConf = new Configuration()
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.rpc.port", "4242")
+    conf.set("spark.ssl.keyStore", keyStorePath)
+    conf.set("spark.ssl.keyStorePassword", "password")
+    conf.set("spark.ssl.rpc.keyStorePassword", "12345")
+    conf.set("spark.ssl.keyPassword", "password")
+    conf.set("spark.ssl.privateKey", privateKeyPath)
+    conf.set("spark.ssl.certChain", certChainPath)
+    conf.set("spark.ssl.trustStore", trustStorePath)
+    conf.set("spark.ssl.trustStorePassword", "password")
+    conf.set("spark.ssl.rpc.trustStoreReloadingEnabled", "true")
+    conf.set("spark.ssl.rpc.trustStoreReloadIntervalMs", "20000")
+    conf.set("spark.ssl.rpc.openSslEnabled", "true")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.rpc.enabledAlgorithms", "ABC, DEF")
+    conf.set("spark.ssl.protocol", "SSLv3")
+
+    val disabledDefaults = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None)
+    val disabledOpts = SSLOptions.parse(
+      conf, hadoopConf, "spark.ssl.rpc", defaults = Some(disabledDefaults))
+
+    assert(disabledOpts.enabled === false)
+    assert(disabledOpts.port.isEmpty)
+
+    // Now enable it and test again
+    conf.set("spark.ssl.rpc.enabled", "true")
+    val defaultOpts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None)
+    val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl.rpc", defaults = Some(defaultOpts))
+
+    assert(opts.enabled === true)
+    assert(opts.port === Some(4242))
+    assert(opts.trustStore.isDefined)
+    assert(opts.trustStore.get.getName === "truststore")
+    assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+    assert(opts.privateKey.isDefined === true)
+    assert(opts.privateKey.get.getName === "key.pem")
+    assert(opts.privateKey.get.getAbsolutePath === privateKeyPath)
+    assert(opts.certChain.isDefined === true)
+    assert(opts.certChain.get.getName === "certchain.pem")
+    assert(opts.certChain.get.getAbsolutePath === certChainPath)
     assert(opts.keyStore.isDefined)
     assert(opts.keyStore.get.getName === "keystore")
     assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
     assert(opts.trustStorePassword === Some("password"))
     assert(opts.keyStorePassword === Some("12345"))
     assert(opts.keyPassword === Some("password"))
+    assert(opts.trustStoreReloadingEnabled === true)
+    assert(opts.trustStoreReloadIntervalMs === 20000)
+    assert(opts.openSslEnabled === true)
     assert(opts.protocol === Some("SSLv3"))
     assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
   }
 
+
   test("SPARK-41719: Skip ssl sub-settings if ssl is disabled") {
     val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
     val conf = new SparkConf
@@ -169,6 +281,21 @@ class SSLOptionsSuite extends SparkFunSuite {
     assert(opts.trustStore === Some(new File("val2")))
   }
 
+  test("get passwords from environment") {
+    val conf = new SparkConfWithEnv(Map(
+      SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> "val1",
+      SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> "val2",
+      SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> "val3"))
+    val hadoopConf = new Configuration()
+
+    conf.set("spark.ssl.enabled", "true")
+
+    val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None)
+    assert(opts.keyPassword === Some("val1"))
+    assert(opts.keyStorePassword === Some("val2"))
+    assert(opts.trustStorePassword === Some("val3"))
+  }
+
   test("get password from Hadoop credential provider") {
     val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
     val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index a2d41b92e08..5f795816750 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -341,7 +341,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     }
   }
 
-  test("SPARK-26998: SSL configuration not needed on executors") {
+  test("SPARK-26998: SSL passwords not needed on executors") {
     val conf = new SparkConf(false)
     conf.set("spark.ssl.enabled", "true")
     conf.set("spark.ssl.keyPassword", "password")
@@ -349,7 +349,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     conf.set("spark.ssl.trustStorePassword", "password")
 
     val filtered = conf.getAll.filter { case (k, _) => SparkConf.isExecutorStartupConf(k) }
-    assert(filtered.isEmpty)
+    // Only the enabled flag should propagate
+    assert(filtered.length == 1)
+    assert(filtered(0)._1 == "spark.ssl.enabled")
   }
 
   test("SPARK-27244 toDebugString redacts sensitive information") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala
index 28e35bc8183..e864b609d0e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala
@@ -21,8 +21,9 @@ import org.scalatest.PrivateMethodTester
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite, SSLOptions}
 import org.apache.spark.deploy.Command
+import org.apache.spark.network.ssl.SslSampleConfigs
 import org.apache.spark.util.Utils
 
 class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTester {
@@ -68,4 +69,29 @@ class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTe
     assert(!cmd.javaOpts.exists(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
     assert(cmd.environment(SecurityManager.ENV_AUTH_SECRET) === secret)
   }
+
+  test("SSL RPC passwords shouldn't appear in java opts") {
+    val buildLocalCommand = PrivateMethod[Command](Symbol("buildLocalCommand"))
+    val conf = new SparkConf
+    conf.set("spark.ssl.rpc.enabled", "true")
+
+    // This sets passwords
+    val updatedConfigs = SslSampleConfigs.createDefaultConfigMapForRpcNamespace()
+    updatedConfigs.entrySet().forEach(entry => conf.set(entry.getKey, entry.getValue))
+
+    val secret = "This is the secret sauce"
+    val command = Command("mainClass", Seq(), Map(), Seq(), Seq("lib"),
+      SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.map(
+        field => "-D" + field + "=" + secret
+      ))
+
+    val cmd = CommandUtils invokePrivate buildLocalCommand(
+      command, new SecurityManager(conf), (t: String) => t, Seq(), Map())
+    SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.foreach(
+      field => assert(!cmd.javaOpts.exists(_.startsWith("-D" + field)))
+    )
+    SSLOptions.SPARK_RPC_SSL_PASSWORD_ENVS.foreach(
+      env => assert(cmd.environment(env) === "password")
+    )
+  }
 }
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 691425cfc5d..47fd7881d2f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -41,7 +41,9 @@ object MimaExcludes {
     // [SPARK-44705][PYTHON] Make PythonRunner single-threaded
     ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.python.BasePythonRunner#ReaderIterator.this"),
     // [SPARK-44198][CORE] Support propagation of the log level to the executors
-    ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$")
+    ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$"),
+    // [SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTransportConf
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf")
   )
 
   // Default exclude rules


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org