You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2021/04/29 04:59:06 UTC

[spark] branch branch-3.1 updated: [SPARK-35226][SQL] Support refreshKrb5Config option in JDBC datasources

This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c245d84  [SPARK-35226][SQL] Support refreshKrb5Config option in JDBC datasources
c245d84 is described below

commit c245d84066785be60ea0fcdbf69f2e09361b5cf4
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Thu Apr 29 13:55:53 2021 +0900

    [SPARK-35226][SQL] Support refreshKrb5Config option in JDBC datasources
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce a new JDBC option `refreshKrb5Config` which allows to reflect the change of `krb5.conf`.
    
    ### Why are the changes needed?
    
    In the current master, JDBC datasources can't accept `refreshKrb5Config` which is defined in `Krb5LoginModule`.
    So even if we change the `krb5.conf` after establishing a connection, the change will not be reflected.
    
    The similar issue happens when we run multiple `*KrbIntegrationSuites` at the same time.
    `MiniKDC` starts and stops every KerberosIntegrationSuite and different port number is recorded to `krb5.conf`.
    Due to `SecureConnectionProvider.JDBCConfiguration` doesn't take `refreshKrb5Config`, KerberosIntegrationSuites except the first running one see the wrong port so those suites fail.
    You can easily confirm with the following command.
    ```
    build/sbt -Phive Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
    ```
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Users can set `refreshKrb5Config` to refresh krb5 relevant configuration.
    
    ### How was this patch tested?
    
    New test.
    
    Closes #32344 from sarutak/kerberos-refresh-issue.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
    (cherry picked from commit 529b875901a91a03caeb73d9eb7b3008b552c736)
    Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
---
 docs/sql-data-sources-jdbc.md                      | 19 ++++++++
 .../spark/sql/jdbc/DB2KrbIntegrationSuite.scala    |  2 +-
 .../sql/jdbc/DockerKrbJDBCIntegrationSuite.scala   | 50 ++++++++++++++++++++++
 .../sql/jdbc/MariaDBKrbIntegrationSuite.scala      |  2 +-
 .../sql/jdbc/PostgresKrbIntegrationSuite.scala     |  2 +-
 .../execution/datasources/jdbc/JDBCOptions.scala   |  3 ++
 .../jdbc/connection/SecureConnectionProvider.scala |  9 ++--
 7 files changed, 81 insertions(+), 6 deletions(-)

diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index 7d60915..89a025c 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -211,6 +211,25 @@ the following case-insensitive options:
      Specifies kerberos principal name for the JDBC client. If both <code>keytab</code> and <code>principal</code> are defined then Spark tries to do kerberos authentication.
     </td>
   </tr>
+
+  <tr>
+    <td><code>refreshKrb5Config</code></td>
+    <td>
+      This option controls whether the kerberos configuration is to be refreshed or not for the JDBC client before
+      establishing a new connection. Set to true if you want to refresh the configuration, otherwise set to false.
+      The default value is false. Note that if you set this option to true and try to establish multiple connections,
+      a race condition can occur. One possble situation would be like as follows.
+      <ol>
+        <li>refreshKrb5Config flag is set with security context 1</li>
+        <li>A JDBC connection provider is used for the corresponding DBMS</li>
+        <li>The krb5.conf is modified but the JVM not yet realized that it must be reloaded</li>
+        <li>Spark authenticates successfully for security context 1</li>
+        <li>The JVM loads security context 2 from the modified krb5.conf</li>
+        <li>Spark restores the previously saved security context 1</li>
+        <li>The modified krb5.conf content just gone</li>
+      </ol>
+    </td>
+  </tr>  
 </table>
 
 <div class="codetabs">
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala
index 5cbe6fa..f79809f 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala
@@ -81,7 +81,7 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
 
   override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
     val config = new SecureConnectionProvider.JDBCConfiguration(
-      Configuration.getConfiguration, "JaasClient", keytabFile, principal)
+      Configuration.getConfiguration, "JaasClient", keytabFile, principal, true)
     Configuration.setConfiguration(config)
   }
 
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala
index c20c006..4a828ae 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.util.{SecurityUtils, Utils}
 
 abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite {
   private var kdc: MiniKdc = _
+  private val KRB5_CONF_PROP = "java.security.krb5.conf"
   protected var entryPointDir: File = _
   protected var initDbDir: File = _
   protected val userName: String
@@ -160,4 +161,53 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite
     assert(rows(0).getString(0) === "foo")
     assert(rows(0).getString(1) === "bar")
   }
+
+  test("SPARK-35226: JDBCOption should accept refreshKrb5Config parameter") {
+    // This makes sure Spark must do authentication
+    Configuration.setConfiguration(null)
+    withTempDir { dir =>
+      val dummyKrb5Conf = File.createTempFile("dummy", "krb5.conf", dir)
+      val origKrb5Conf = sys.props(KRB5_CONF_PROP)
+      try {
+        // Set dummy krb5.conf and refresh config so this assertion is expected to fail.
+        // The thrown exception is dependent on the actual JDBC driver class.
+        intercept[Exception] {
+          sys.props(KRB5_CONF_PROP) = dummyKrb5Conf.getAbsolutePath
+          spark.read.format("jdbc")
+            .option("url", jdbcUrl)
+            .option("keytab", keytabFullPath)
+            .option("principal", principal)
+            .option("refreshKrb5Config", "true")
+            .option("query", "SELECT 1")
+            .load()
+        }
+
+        // Set the authentic krb5.conf but doesn't refresh config
+        // so this assertion is expected to fail.
+        intercept[Exception] {
+          sys.props(KRB5_CONF_PROP) = origKrb5Conf
+          spark.read.format("jdbc")
+            .option("url", jdbcUrl)
+            .option("keytab", keytabFullPath)
+            .option("principal", principal)
+            .option("query", "SELECT 1")
+            .load()
+        }
+
+        sys.props(KRB5_CONF_PROP) = origKrb5Conf
+        val df = spark.read.format("jdbc")
+          .option("url", jdbcUrl)
+          .option("keytab", keytabFullPath)
+          .option("principal", principal)
+          .option("refreshKrb5Config", "true")
+          .option("query", "SELECT 1")
+          .load()
+        val result = df.collect().map(_.getInt(0))
+        assert(result.length === 1)
+        assert(result(0) === 1)
+      } finally {
+        sys.props(KRB5_CONF_PROP) = origKrb5Conf
+      }
+    }
+  }
 }
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala
index 59a6f53..9b653f8 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala
@@ -68,7 +68,7 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
 
   override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
     val config = new SecureConnectionProvider.JDBCConfiguration(
-      Configuration.getConfiguration, "Krb5ConnectorContext", keytabFile, principal)
+      Configuration.getConfiguration, "Krb5ConnectorContext", keytabFile, principal, true)
     Configuration.setConfiguration(config)
   }
 }
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala
index 984890f..1198ba8 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala
@@ -61,7 +61,7 @@ class PostgresKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
 
   override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
     val config = new SecureConnectionProvider.JDBCConfiguration(
-      Configuration.getConfiguration, "pgjdbc", keytabFile, principal)
+      Configuration.getConfiguration, "pgjdbc", keytabFile, principal, true)
     Configuration.setConfiguration(config)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 6e8b7ea..4cb221a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -208,6 +208,8 @@ class JDBCOptions(
   val principal = parameters.getOrElse(JDBC_PRINCIPAL, null)
 
   val tableComment = parameters.getOrElse(JDBC_TABLE_COMMENT, "").toString
+
+  val refreshKrb5Config = parameters.getOrElse(JDBC_REFRESH_KRB5_CONFIG, "false").toBoolean
 }
 
 class JdbcOptionsInWrite(
@@ -263,4 +265,5 @@ object JDBCOptions {
   val JDBC_KEYTAB = newOption("keytab")
   val JDBC_PRINCIPAL = newOption("principal")
   val JDBC_TABLE_COMMENT = newOption("tableComment")
+  val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config")
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala
index 4138c72..71c20e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala
@@ -52,7 +52,8 @@ private[jdbc] abstract class SecureConnectionProvider extends BasicConnectionPro
   private[connection] def setAuthenticationConfig(driver: Driver, options: JDBCOptions) = {
     val parent = Configuration.getConfiguration
     val config = new SecureConnectionProvider.JDBCConfiguration(
-      parent, appEntry(driver, options), options.keytab, options.principal)
+      parent, appEntry(driver, options), options.keytab,
+      options.principal, options.refreshKrb5Config)
     logDebug("Adding database specific security configuration")
     Configuration.setConfiguration(config)
   }
@@ -63,7 +64,8 @@ object SecureConnectionProvider {
     parent: Configuration,
     appEntry: String,
     keytab: String,
-    principal: String) extends Configuration {
+    principal: String,
+    refreshKrb5Config: Boolean) extends Configuration {
   val entry =
     new AppConfigurationEntry(
       SecurityUtils.getKrb5LoginModuleName(),
@@ -73,7 +75,8 @@ object SecureConnectionProvider {
         "useKeyTab" -> "true",
         "keyTab" -> keytab,
         "principal" -> principal,
-        "debug" -> "true"
+        "debug" -> "true",
+        "refreshKrb5Config" -> refreshKrb5Config.toString
       ).asJava
     )
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org