You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2020/03/13 02:05:12 UTC

[spark] branch master updated: [SPARK-30874][SQL] Support Postgres Kerberos login in JDBC connector

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 231e650  [SPARK-30874][SQL] Support Postgres Kerberos login in JDBC connector
231e650 is described below

commit 231e65092fa97516e30c4ef12e635bfe3e97c7f0
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Thu Mar 12 19:04:35 2020 -0700

    [SPARK-30874][SQL] Support Postgres Kerberos login in JDBC connector
    
    ### What changes were proposed in this pull request?
    When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.
    
    This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues.
    
    In this PR I've added Postgres support (other supported databases will come in later PRs).
    
    What this PR contains:
    * Added `keytab` and `principal` JDBC options
    * Added `ConnectionProvider` trait and it's impementations:
      * `BasicConnectionProvider` => unsecure connection
      * `PostgresConnectionProvider` => postgres secure connection
    * Added `ConnectionProvider` tests
    * Added `PostgresKrbIntegrationSuite` docker integration test
    * Created `SecurityUtils` to concentrate re-usable security related functionalities
    * Documentation
    
    ### Why are the changes needed?
    Missing JDBC kerberos support.
    
    ### Does this PR introduce any user-facing change?
    Yes, 2 additional JDBC options added:
    * keytab
    * principal
    
    If both provided then Spark does kerberos authentication.
    
    ### How was this patch tested?
    To demonstrate the functionality with a standalone application I've created this repository: https://github.com/gaborgsomogyi/docker-kerberos
    
    * Additional + existing unit tests
    * Additional docker integration test
    * Test on cluster manually
    * `SKIP_API=1 jekyll build`
    
    Closes #27637 from gaborgsomogyi/SPARK-30874.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@apache.org>
---
 .../org/apache/spark/util/SecurityUtils.scala      |  69 +++++++++++
 docs/sql-data-sources-jdbc.md                      |  14 +++
 external/docker-integration-tests/pom.xml          |   5 +
 .../src/test/resources/log4j.properties            |  36 ++++++
 .../src/test/resources/postgres_krb_setup.sh       |  21 ++++
 .../sql/jdbc/DockerJDBCIntegrationSuite.scala      |  15 ++-
 .../sql/jdbc/DockerKrbJDBCIntegrationSuite.scala   |  94 +++++++++++++++
 .../sql/jdbc/PostgresKrbIntegrationSuite.scala     | 129 +++++++++++++++++++++
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  30 +----
 .../org/apache/spark/kafka010/KafkaTokenUtil.scala |  32 +----
 .../execution/datasources/jdbc/JDBCOptions.scala   |  25 +++-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |   3 +-
 .../jdbc/connection/BasicConnectionProvider.scala  |  29 +++++
 .../jdbc/connection/ConnectionProvider.scala       |  52 +++++++++
 .../connection/PostgresConnectionProvider.scala    |  82 +++++++++++++
 .../PostgresConnectionProviderSuite.scala          |  85 ++++++++++++++
 16 files changed, 663 insertions(+), 58 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/SecurityUtils.scala b/core/src/main/scala/org/apache/spark/util/SecurityUtils.scala
new file mode 100644
index 0000000..7831f66
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SecurityUtils.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Various utility methods used by Spark Security.
+ */
+private[spark] object SecurityUtils {
+  private val JAVA_VENDOR = "java.vendor"
+  private val IBM_KRB_DEBUG_CONFIG = "com.ibm.security.krb5.Krb5Debug"
+  private val SUN_KRB_DEBUG_CONFIG = "sun.security.krb5.debug"
+
+  def setGlobalKrbDebug(enabled: Boolean): Unit = {
+    if (enabled) {
+      if (isIBMVendor()) {
+        System.setProperty(IBM_KRB_DEBUG_CONFIG, "all")
+      } else {
+        System.setProperty(SUN_KRB_DEBUG_CONFIG, "true")
+      }
+    } else {
+      if (isIBMVendor()) {
+        System.clearProperty(IBM_KRB_DEBUG_CONFIG)
+      } else {
+        System.clearProperty(SUN_KRB_DEBUG_CONFIG)
+      }
+    }
+  }
+
+  def isGlobalKrbDebugEnabled(): Boolean = {
+    if (isIBMVendor()) {
+      val debug = System.getenv(IBM_KRB_DEBUG_CONFIG)
+      debug != null && debug.equalsIgnoreCase("all")
+    } else {
+      val debug = System.getenv(SUN_KRB_DEBUG_CONFIG)
+      debug != null && debug.equalsIgnoreCase("true")
+    }
+  }
+
+  /**
+   * Krb5LoginModule package varies in different JVMs.
+   * Please see Hadoop UserGroupInformation for further details.
+   */
+  def getKrb5LoginModuleName(): String = {
+    if (isIBMVendor()) {
+      "com.ibm.security.auth.module.Krb5LoginModule"
+    } else {
+      "com.sun.security.auth.module.Krb5LoginModule"
+    }
+  }
+
+  private def isIBMVendor(): Boolean = {
+    System.getProperty(JAVA_VENDOR).contains("IBM")
+  }
+}
diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index 3cdff42..10d52bf 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -197,6 +197,20 @@ the following case-insensitive options:
      The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible. Otherwise, if set to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark. Predicate push-down is usually turned off when the predicate filtering is performed faster by Spark than by the JDBC data source.
     </td>
   </tr>
+
+  <tr>
+    <td><code>keytab</code></td>
+    <td>
+     Location of the kerberos keytab file (which must be pre-uploaded to all nodes either by <code>--files</code> option of spark-submit or manually) for the JDBC client. When path information found then Spark considers the keytab distributed manually, otherwise <code>--files</code> assumed. If both <code>keytab</code> and <code>principal</code> are defined then Spark tries to do kerberos authentication.
+    </td>
+  </tr>
+
+  <tr>
+    <td><code>principal</code></td>
+    <td>
+     Specifies kerberos principal name for the JDBC client. If both <code>keytab</code> and <code>principal</code> are defined then Spark tries to do kerberos authentication.
+    </td>
+  </tr>
 </table>
 
 <div class="codetabs">
diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml
index d3b1399..c357a2f 100644
--- a/external/docker-integration-tests/pom.xml
+++ b/external/docker-integration-tests/pom.xml
@@ -106,6 +106,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
     <!-- Although SPARK-28737 upgraded Jersey to 2.29 for JDK11, 'com.spotify.docker-client' still
       uses this repackaged 'jersey-guava'. We add this back for JDK8/JDK11 testing. -->
     <dependency>
diff --git a/external/docker-integration-tests/src/test/resources/log4j.properties b/external/docker-integration-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..21fcd34
--- /dev/null
+++ b/external/docker-integration-tests/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+test.appender=file
+log4j.rootCategory=INFO, ${test.appender}
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Tests that launch java subprocesses can set the "test.appender" system property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.sparkproject.jetty=WARN
diff --git a/external/docker-integration-tests/src/test/resources/postgres_krb_setup.sh b/external/docker-integration-tests/src/test/resources/postgres_krb_setup.sh
new file mode 100755
index 0000000..daaaf8c
--- /dev/null
+++ b/external/docker-integration-tests/src/test/resources/postgres_krb_setup.sh
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sed -i 's/host all all all md5/host all all all gss/g' /var/lib/postgresql/data/pg_hba.conf
+echo "krb_server_keyfile='/docker-entrypoint-initdb.d/postgres.keytab'" >> /var/lib/postgresql/data/postgresql.conf
+psql -U postgres -c "CREATE ROLE \"postgres/__IP_ADDRESS_REPLACE_ME__@EXAMPLE.COM\" LOGIN SUPERUSER"
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 519a5cd..cd26fb3 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -62,10 +62,18 @@ abstract class DatabaseOnDocker {
    * Optional process to run when container starts
    */
   def getStartupProcessName: Option[String]
+
+  /**
+   * Optional step before container starts
+   */
+  def beforeContainerStart(
+      hostConfigBuilder: HostConfig.Builder,
+      containerConfigBuilder: ContainerConfig.Builder): Unit = {}
 }
 
 abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventually {
 
+  protected val dockerIp = DockerUtils.getDockerIp()
   val db: DatabaseOnDocker
 
   private var docker: DockerClient = _
@@ -99,24 +107,23 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
         sock.close()
         port
       }
-      val dockerIp = DockerUtils.getDockerIp()
-      val hostConfig: HostConfig = HostConfig.builder()
+      val hostConfigBuilder = HostConfig.builder()
         .networkMode("bridge")
         .ipcMode(if (db.usesIpc) "host" else "")
         .portBindings(
           Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava)
-        .build()
       // Create the database container:
       val containerConfigBuilder = ContainerConfig.builder()
         .image(db.imageName)
         .networkDisabled(false)
         .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
-        .hostConfig(hostConfig)
         .exposedPorts(s"${db.jdbcPort}/tcp")
       if(db.getStartupProcessName.isDefined) {
         containerConfigBuilder
         .cmd(db.getStartupProcessName.get)
       }
+      db.beforeContainerStart(hostConfigBuilder, containerConfigBuilder)
+      containerConfigBuilder.hostConfig(hostConfigBuilder.build())
       val config = containerConfigBuilder.build()
       // Create the database container:
       containerId = docker.createContainer(config).id
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala
new file mode 100644
index 0000000..583d8108
--- /dev/null
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.io.{File, FileInputStream, FileOutputStream}
+import javax.security.auth.login.Configuration
+
+import scala.io.Source
+
+import org.apache.hadoop.minikdc.MiniKdc
+
+import org.apache.spark.util.{SecurityUtils, Utils}
+
+abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite {
+  private var kdc: MiniKdc = _
+  protected var workDir: File = _
+  protected val userName: String
+  protected var principal: String = _
+  protected val keytabFileName: String
+  protected var keytabFullPath: String = _
+  protected def setAuthentication(keytabFile: String, principal: String): Unit
+
+  override def beforeAll(): Unit = {
+    SecurityUtils.setGlobalKrbDebug(true)
+
+    val kdcDir = Utils.createTempDir()
+    val kdcConf = MiniKdc.createConf()
+    kdcConf.setProperty(MiniKdc.DEBUG, "true")
+    kdc = new MiniKdc(kdcConf, kdcDir)
+    kdc.start()
+
+    principal = s"$userName@${kdc.getRealm}"
+
+    workDir = Utils.createTempDir()
+    val keytabFile = new File(workDir, keytabFileName)
+    keytabFullPath = keytabFile.getAbsolutePath
+    kdc.createPrincipal(keytabFile, userName)
+    logInfo(s"Created keytab file: $keytabFullPath")
+
+    setAuthentication(keytabFullPath, principal)
+
+    // This must be executed intentionally later
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      if (kdc != null) {
+        kdc.stop()
+      }
+      Configuration.setConfiguration(null)
+      SecurityUtils.setGlobalKrbDebug(false)
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  protected def copyExecutableResource(
+      fileName: String, dir: File, processLine: String => String) = {
+    val newEntry = new File(dir.getAbsolutePath, fileName)
+    newEntry.createNewFile()
+    Utils.tryWithResource(
+      new FileInputStream(getClass.getClassLoader.getResource(fileName).getFile)
+    ) { inputStream =>
+      val outputStream = new FileOutputStream(newEntry)
+      try {
+        for (line <- Source.fromInputStream(inputStream).getLines()) {
+          val processedLine = processLine(line) + System.lineSeparator()
+          outputStream.write(processedLine.getBytes)
+        }
+      } finally {
+        outputStream.close()
+      }
+    }
+    newEntry.setExecutable(true, false)
+    logInfo(s"Created executable resource file: ${newEntry.getAbsolutePath}")
+    newEntry
+  }
+}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala
new file mode 100644
index 0000000..721a488
--- /dev/null
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Connection
+import java.util.Properties
+import javax.security.auth.login.Configuration
+
+import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.tags.DockerTest
+
+@DockerTest
+class PostgresKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
+  override protected val userName = s"postgres/$dockerIp"
+  override protected val keytabFileName = "postgres.keytab"
+
+  override val db = new DatabaseOnDocker {
+    override val imageName = "postgres:12.0"
+    override val env = Map(
+      "POSTGRES_PASSWORD" -> "rootpass"
+    )
+    override val usesIpc = false
+    override val jdbcPort = 5432
+
+    override def getJdbcUrl(ip: String, port: Int): String =
+      s"jdbc:postgresql://$ip:$port/postgres?user=$principal&gsslib=gssapi"
+
+    override def getStartupProcessName: Option[String] = None
+
+    override def beforeContainerStart(
+        hostConfigBuilder: HostConfig.Builder,
+        containerConfigBuilder: ContainerConfig.Builder): Unit = {
+      def replaceIp(s: String): String = s.replace("__IP_ADDRESS_REPLACE_ME__", dockerIp)
+      copyExecutableResource("postgres_krb_setup.sh", workDir, replaceIp)
+
+      hostConfigBuilder.appendBinds(
+        HostConfig.Bind.from(workDir.getAbsolutePath)
+          .to("/docker-entrypoint-initdb.d").readOnly(true).build()
+      )
+    }
+  }
+
+  override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
+    val config = new PostgresConnectionProvider.PGJDBCConfiguration(
+      Configuration.getConfiguration, "pgjdbc", keytabFile, principal)
+    Configuration.setConfiguration(config)
+  }
+
+  override def dataPreparation(conn: Connection): Unit = {
+    conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
+    conn.setCatalog("foo")
+    conn.prepareStatement("CREATE TABLE bar (c0 text)").executeUpdate()
+    conn.prepareStatement("INSERT INTO bar VALUES ('hello')").executeUpdate()
+  }
+
+  test("Basic read test in query option") {
+    // This makes sure Spark must do authentication
+    Configuration.setConfiguration(null)
+
+    val expectedResult = Set("hello").map(Row(_))
+
+    val query = "SELECT c0 FROM bar"
+    // query option to pass on the query string.
+    val df = spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("keytab", keytabFullPath)
+      .option("principal", principal)
+      .option("query", query)
+      .load()
+    assert(df.collect().toSet === expectedResult)
+  }
+
+  test("Basic read test in create table path") {
+    // This makes sure Spark must do authentication
+    Configuration.setConfiguration(null)
+
+    val expectedResult = Set("hello").map(Row(_))
+
+    val query = "SELECT c0 FROM bar"
+    // query option in the create table path.
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW queryOption
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$jdbcUrl', query '$query', keytab '$keytabFullPath', principal '$principal')
+       """.stripMargin.replaceAll("\n", " "))
+    assert(sql("select c0 from queryOption").collect().toSet === expectedResult)
+  }
+
+  test("Basic write test") {
+    // This makes sure Spark must do authentication
+    Configuration.setConfiguration(null)
+
+    val props = new Properties
+    props.setProperty("keytab", keytabFullPath)
+    props.setProperty("principal", principal)
+
+    val tableName = "write_test"
+    sqlContext.createDataFrame(Seq(("foo", "bar")))
+      .write.jdbc(jdbcUrl, tableName, props)
+    val df = sqlContext.read.jdbc(jdbcUrl, tableName, props)
+
+    val schema = df.schema
+    assert(schema.map(_.dataType).toSeq === Seq(StringType, StringType))
+    val rows = df.collect()
+    assert(rows.length === 1)
+    assert(rows(0).getString(0) === "foo")
+    assert(rows(0).getString(1) === "bar")
+  }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 7b972fe..90fb188 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -54,7 +54,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.KafkaTokenUtil
-import org.apache.spark.util.{ShutdownHookManager, Utils}
+import org.apache.spark.util.{SecurityUtils, ShutdownHookManager, Utils}
 
 /**
  * This is a helper class for Kafka test suites. This has the functionality to set up
@@ -67,8 +67,6 @@ class KafkaTestUtils(
     secure: Boolean = false) extends Logging {
 
   private val JAVA_AUTH_CONFIG = "java.security.auth.login.config"
-  private val IBM_KRB_DEBUG_CONFIG = "com.ibm.security.krb5.Krb5Debug"
-  private val SUN_KRB_DEBUG_CONFIG = "sun.security.krb5.debug"
 
   private val localCanonicalHostName = InetAddress.getLoopbackAddress().getCanonicalHostName()
   logInfo(s"Local host name is $localCanonicalHostName")
@@ -204,7 +202,7 @@ class KafkaTestUtils(
     val content =
       s"""
       |Server {
-      |  ${KafkaTokenUtil.getKrb5LoginModuleName} required
+      |  ${SecurityUtils.getKrb5LoginModuleName()} required
       |  useKeyTab=true
       |  storeKey=true
       |  useTicketCache=false
@@ -214,7 +212,7 @@ class KafkaTestUtils(
       |};
       |
       |Client {
-      |  ${KafkaTokenUtil.getKrb5LoginModuleName} required
+      |  ${SecurityUtils.getKrb5LoginModuleName()} required
       |  useKeyTab=true
       |  storeKey=true
       |  useTicketCache=false
@@ -224,7 +222,7 @@ class KafkaTestUtils(
       |};
       |
       |KafkaServer {
-      |  ${KafkaTokenUtil.getKrb5LoginModuleName} required
+      |  ${SecurityUtils.getKrb5LoginModuleName()} required
       |  serviceName="$brokerServiceName"
       |  useKeyTab=true
       |  storeKey=true
@@ -279,7 +277,7 @@ class KafkaTestUtils(
     }
 
     if (secure) {
-      setupKrbDebug()
+      SecurityUtils.setGlobalKrbDebug(true)
       setUpMiniKdc()
       val jaasConfigFile = createKeytabsAndJaasConfigFile()
       System.setProperty(JAVA_AUTH_CONFIG, jaasConfigFile)
@@ -294,14 +292,6 @@ class KafkaTestUtils(
     }
   }
 
-  private def setupKrbDebug(): Unit = {
-    if (System.getProperty("java.vendor").contains("IBM")) {
-      System.setProperty(IBM_KRB_DEBUG_CONFIG, "all")
-    } else {
-      System.setProperty(SUN_KRB_DEBUG_CONFIG, "true")
-    }
-  }
-
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
   def teardown(): Unit = {
     if (leakDetector != null) {
@@ -353,15 +343,7 @@ class KafkaTestUtils(
       kdc.stop()
     }
     UserGroupInformation.reset()
-    teardownKrbDebug()
-  }
-
-  private def teardownKrbDebug(): Unit = {
-    if (System.getProperty("java.vendor").contains("IBM")) {
-      System.clearProperty(IBM_KRB_DEBUG_CONFIG)
-    } else {
-      System.clearProperty(SUN_KRB_DEBUG_CONFIG)
-    }
+    SecurityUtils.setGlobalKrbDebug(false)
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole cluster */
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 49109d3..307a69f 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -41,7 +41,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SecurityUtils, Utils}
 import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
 
 private[spark] object KafkaTokenUtil extends Logging {
@@ -190,8 +190,8 @@ private[spark] object KafkaTokenUtil extends Logging {
       kerberosServiceName: String): String = {
     val params =
       s"""
-      |${getKrb5LoginModuleName} required
-      | debug=${isGlobalKrbDebugEnabled()}
+      |${SecurityUtils.getKrb5LoginModuleName} required
+      | debug=${SecurityUtils.isGlobalKrbDebugEnabled()}
       | useKeyTab=true
       | serviceName="$kerberosServiceName"
       | keyTab="$keyTab"
@@ -204,8 +204,8 @@ private[spark] object KafkaTokenUtil extends Logging {
   private def getTicketCacheJaasParams(clusterConf: KafkaTokenClusterConf): String = {
     val params =
       s"""
-      |${getKrb5LoginModuleName} required
-      | debug=${isGlobalKrbDebugEnabled()}
+      |${SecurityUtils.getKrb5LoginModuleName} required
+      | debug=${SecurityUtils.isGlobalKrbDebugEnabled()}
       | useTicketCache=true
       | serviceName="${clusterConf.kerberosServiceName}";
       """.stripMargin.replace("\n", "").trim
@@ -213,28 +213,6 @@ private[spark] object KafkaTokenUtil extends Logging {
     params
   }
 
-  /**
-   * Krb5LoginModule package vary in different JVMs.
-   * Please see Hadoop UserGroupInformation for further details.
-   */
-  def getKrb5LoginModuleName(): String = {
-    if (System.getProperty("java.vendor").contains("IBM")) {
-      "com.ibm.security.auth.module.Krb5LoginModule"
-    } else {
-      "com.sun.security.auth.module.Krb5LoginModule"
-    }
-  }
-
-  private def isGlobalKrbDebugEnabled(): Boolean = {
-    if (System.getProperty("java.vendor").contains("IBM")) {
-      val debug = System.getenv("com.ibm.security.krb5.Krb5Debug")
-      debug != null && debug.equalsIgnoreCase("all")
-    } else {
-      val debug = System.getenv("sun.security.krb5.debug")
-      debug != null && debug.equalsIgnoreCase("true")
-    }
-  }
-
   private def printToken(token: DelegationToken): Unit = {
     if (log.isDebugEnabled) {
       val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 222ef11..9e0438c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -20,15 +20,18 @@ package org.apache.spark.sql.execution.datasources.jdbc
 import java.sql.{Connection, DriverManager}
 import java.util.{Locale, Properties}
 
+import org.apache.commons.io.FilenameUtils
+
+import org.apache.spark.SparkFiles
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.types.StructType
 
 /**
  * Options for the JDBC data source.
  */
 class JDBCOptions(
     @transient val parameters: CaseInsensitiveMap[String])
-  extends Serializable {
+  extends Serializable with Logging {
 
   import JDBCOptions._
 
@@ -187,6 +190,22 @@ class JDBCOptions(
 
   // An option to allow/disallow pushing down predicate into JDBC data source
   val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean
+
+  // The local path of user's keytab file, which is assumed to be pre-uploaded to all nodes either
+  // by --files option of spark-submit or manually
+  val keytab = {
+    val keytabParam = parameters.getOrElse(JDBC_KEYTAB, null)
+    if (keytabParam != null && FilenameUtils.getPath(keytabParam).isEmpty) {
+      val result = SparkFiles.get(keytabParam)
+      logDebug(s"Keytab path not found, assuming --files, file name used on executor: $result")
+      result
+    } else {
+      logDebug("Keytab path found, assuming manual upload")
+      keytabParam
+    }
+  }
+  // The principal name of user's keytab file
+  val principal = parameters.getOrElse(JDBC_PRINCIPAL, null)
 }
 
 class JdbcOptionsInWrite(
@@ -239,4 +258,6 @@ object JDBCOptions {
   val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
   val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
   val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
+  val JDBC_KEYTAB = newOption("keytab")
+  val JDBC_PRINCIPAL = newOption("principal")
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index c1e1aed..7a73ad50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
@@ -61,7 +62,7 @@ object JdbcUtils extends Logging {
         throw new IllegalStateException(
           s"Did not find registered driver with class $driverClass")
       }
-      val connection: Connection = driver.connect(options.url, options.asConnectionProperties)
+      val connection = ConnectionProvider.create(driver, options).getConnection()
       require(connection != null,
         s"The driver could not open a JDBC connection. Check the URL: ${options.url}")
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala
new file mode 100644
index 0000000..a5e3fbd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc.connection
+
+import java.sql.{Connection, Driver}
+
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+
+private[jdbc] class BasicConnectionProvider(driver: Driver, options: JDBCOptions)
+    extends ConnectionProvider {
+  def getConnection(): Connection = {
+    driver.connect(options.url, options.asConnectionProperties)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
new file mode 100644
index 0000000..ccaff0d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc.connection
+
+import java.sql.{Connection, Driver}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+
+/**
+ * Connection provider which opens connection toward various databases (database specific instance
+ * needed). If kerberos authentication required then it's the provider's responsibility to set all
+ * the parameters.
+ */
+private[jdbc] trait ConnectionProvider {
+  def getConnection(): Connection
+}
+
+private[jdbc] object ConnectionProvider extends Logging {
+  def create(driver: Driver, options: JDBCOptions): ConnectionProvider = {
+    if (options.keytab == null || options.principal == null) {
+      logDebug("No authentication configuration found, using basic connection provider")
+      new BasicConnectionProvider(driver, options)
+    } else {
+      logDebug("Authentication configuration found, using database specific connection provider")
+      options.driverClass match {
+        case PostgresConnectionProvider.driverClass =>
+          logDebug("Postgres connection provider found")
+          new PostgresConnectionProvider(driver, options)
+
+        case _ =>
+          throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " +
+            "Kerberos authentication")
+      }
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala
new file mode 100644
index 0000000..e793c4d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc.connection
+
+import java.sql.{Connection, Driver}
+import java.util.Properties
+import javax.security.auth.login.{AppConfigurationEntry, Configuration}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+import org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider.PGJDBCConfiguration
+import org.apache.spark.util.SecurityUtils
+
+private[jdbc] class PostgresConnectionProvider(driver: Driver, options: JDBCOptions)
+    extends BasicConnectionProvider(driver, options) {
+  val appEntry: String = {
+    val parseURL = driver.getClass.getMethod("parseURL", classOf[String], classOf[Properties])
+    val properties = parseURL.invoke(driver, options.url, null).asInstanceOf[Properties]
+    properties.getProperty("jaasApplicationName", "pgjdbc")
+  }
+
+  def setAuthenticationConfigIfNeeded(): Unit = {
+    val parent = Configuration.getConfiguration
+    val configEntry = parent.getAppConfigurationEntry(appEntry)
+    if (configEntry == null || configEntry.isEmpty) {
+      val config = new PGJDBCConfiguration(parent, appEntry, options.keytab, options.principal)
+      Configuration.setConfiguration(config)
+    }
+  }
+
+  override def getConnection(): Connection = {
+    setAuthenticationConfigIfNeeded()
+    super.getConnection()
+  }
+}
+
+private[sql] object PostgresConnectionProvider {
+  class PGJDBCConfiguration(
+      parent: Configuration,
+      appEntry: String,
+      keytab: String,
+      principal: String) extends Configuration {
+    private val entry =
+      new AppConfigurationEntry(
+        SecurityUtils.getKrb5LoginModuleName(),
+        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+        Map[String, Object](
+          "useTicketCache" -> "false",
+          "useKeyTab" -> "true",
+          "keyTab" -> keytab,
+          "principal" -> principal,
+          "debug" -> "true"
+        ).asJava
+      )
+
+    override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
+      if (name.equals(appEntry)) {
+        Array(entry)
+      } else {
+        parent.getAppConfigurationEntry(name)
+      }
+    }
+  }
+
+  val driverClass = "org.postgresql.Driver"
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala
new file mode 100644
index 0000000..59ff1c7
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc.connection
+
+import java.sql.{Driver, DriverManager}
+import javax.security.auth.login.Configuration
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, JDBCOptions}
+
+class PostgresConnectionProviderSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private def options(url: String) = new JDBCOptions(Map[String, String](
+    JDBCOptions.JDBC_URL -> url,
+    JDBCOptions.JDBC_TABLE_NAME -> "table",
+    JDBCOptions.JDBC_KEYTAB -> "/path/to/keytab",
+    JDBCOptions.JDBC_PRINCIPAL -> "principal"
+  ))
+
+  override def afterEach(): Unit = {
+    try {
+      Configuration.setConfiguration(null)
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  test("setAuthenticationConfigIfNeeded must set authentication if not set") {
+    DriverRegistry.register(PostgresConnectionProvider.driverClass)
+    val driver = DriverManager.getDrivers.asScala.collectFirst {
+      case d if d.getClass.getCanonicalName == PostgresConnectionProvider.driverClass => d
+    }.get
+    val defaultProvider = new PostgresConnectionProvider(
+      driver, options("jdbc:postgresql://localhost/postgres"))
+    val customProvider = new PostgresConnectionProvider(
+      driver, options(s"jdbc:postgresql://localhost/postgres?jaasApplicationName=custompgjdbc"))
+
+    assert(defaultProvider.appEntry !== customProvider.appEntry)
+
+    // Make sure no authentication for postgres is set
+    assert(Configuration.getConfiguration.getAppConfigurationEntry(
+      defaultProvider.appEntry) == null)
+    assert(Configuration.getConfiguration.getAppConfigurationEntry(
+      customProvider.appEntry) == null)
+
+    // Make sure the first call sets authentication properly
+    val savedConfig = Configuration.getConfiguration
+    defaultProvider.setAuthenticationConfigIfNeeded()
+    val defaultConfig = Configuration.getConfiguration
+    assert(savedConfig != defaultConfig)
+    val defaultAppEntry = defaultConfig.getAppConfigurationEntry(defaultProvider.appEntry)
+    assert(defaultAppEntry != null)
+    customProvider.setAuthenticationConfigIfNeeded()
+    val customConfig = Configuration.getConfiguration
+    assert(savedConfig != customConfig)
+    assert(defaultConfig != customConfig)
+    val customAppEntry = customConfig.getAppConfigurationEntry(customProvider.appEntry)
+    assert(customAppEntry != null)
+
+    // Make sure a second call is not modifying the existing authentication
+    defaultProvider.setAuthenticationConfigIfNeeded()
+    customProvider.setAuthenticationConfigIfNeeded()
+    assert(customConfig == Configuration.getConfiguration)
+    assert(defaultConfig.getAppConfigurationEntry(defaultProvider.appEntry) === defaultAppEntry)
+    assert(customConfig.getAppConfigurationEntry(customProvider.appEntry) === customAppEntry)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org