You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/30 17:32:52 UTC

[spark] branch master updated: [SPARK-31336][SQL] Support Oracle Kerberos login in JDBC connector

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 67cb7ea  [SPARK-31336][SQL] Support Oracle Kerberos login in JDBC connector
67cb7ea is described below

commit 67cb7eaa6572770de0a6cc2f871eacdb15a572b2
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Tue Jun 30 10:30:22 2020 -0700

    [SPARK-31336][SQL] Support Oracle Kerberos login in JDBC connector
    
    ### What changes were proposed in this pull request?
    When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.
    
    This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues.
    
    In this PR I've added Oracle support.
    
    What this PR contains:
    * Added `OracleConnectionProvider`
    * Added `OracleConnectionProviderSuite`
    
    ### Why are the changes needed?
    Missing JDBC kerberos support.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, now user is able to connect to Oracle using kerberos.
    
    ### How was this patch tested?
    * Additional + existing unit tests
    * Test on cluster manually
    
    Closes #28863 from gaborgsomogyi/SPARK-31336.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 sql/core/pom.xml                                   |  5 ++
 .../jdbc/connection/ConnectionProvider.scala       |  4 ++
 .../jdbc/connection/OracleConnectionProvider.scala | 62 ++++++++++++++++++++++
 .../connection/OracleConnectionProviderSuite.scala | 28 ++++++++++
 4 files changed, 99 insertions(+)

diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 0855fa1..c2ed4c0 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -151,6 +151,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.oracle.database.jdbc</groupId>
+      <artifactId>ojdbc8</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
       <scope>test</scope>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
index 6c310ce..ce45be4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
@@ -64,6 +64,10 @@ private[jdbc] object ConnectionProvider extends Logging {
           logDebug("MS SQL connection provider found")
           new MSSQLConnectionProvider(driver, options)
 
+        case OracleConnectionProvider.driverClass =>
+          logDebug("Oracle connection provider found")
+          new OracleConnectionProvider(driver, options)
+
         case _ =>
           throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " +
             "Kerberos authentication")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala
new file mode 100644
index 0000000..c2b71b3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc.connection
+
+import java.security.PrivilegedExceptionAction
+import java.sql.{Connection, Driver}
+import java.util.Properties
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+
+private[sql] class OracleConnectionProvider(driver: Driver, options: JDBCOptions)
+  extends SecureConnectionProvider(driver, options) {
+  override val appEntry: String = "kprb5module"
+
+  override def getConnection(): Connection = {
+    setAuthenticationConfigIfNeeded()
+    UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs(
+      new PrivilegedExceptionAction[Connection]() {
+        override def run(): Connection = {
+          OracleConnectionProvider.super.getConnection()
+        }
+      }
+    )
+  }
+
+  override def getAdditionalProperties(): Properties = {
+    val result = new Properties()
+    // This prop is needed to turn on kerberos authentication in the JDBC driver.
+    // The possible values can be found in AnoServices public interface
+    // The value is coming from AUTHENTICATION_KERBEROS5 final String in driver version 19.6.0.0
+    result.put("oracle.net.authentication_services", "(KERBEROS5)");
+    result
+  }
+
+  override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized {
+    val (parent, configEntry) = getConfigWithAppEntry()
+    if (configEntry == null || configEntry.isEmpty) {
+      setAuthenticationConfig(parent)
+    }
+  }
+}
+
+private[sql] object OracleConnectionProvider {
+  val driverClass = "oracle.jdbc.OracleDriver"
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala
new file mode 100644
index 0000000..13cde32
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc.connection
+
+class OracleConnectionProviderSuite extends ConnectionProviderSuiteBase {
+  test("setAuthenticationConfigIfNeeded must set authentication if not set") {
+    val driver = registerDriver(OracleConnectionProvider.driverClass)
+    val provider = new OracleConnectionProvider(driver,
+      options("jdbc:oracle:thin:@//localhost/xe"))
+
+    testSecureConnectionProvider(provider)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org