You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/01/29 22:09:22 UTC

spark git commit: [SPARK-23209][core] Allow credential manager to work when Hive not available.

Repository: spark
Updated Branches:
  refs/heads/master e30b34f7b -> b834446ec


[SPARK-23209][core] Allow credential manager to work when Hive not available.

The JVM seems to be doing early binding of classes that the Hive provider
depends on, causing an error to be thrown before it was caught by the code
in the class.

The fix wraps the creation of the provider in a try..catch so that
the provider can be ignored when dependencies are missing.

Added a unit test (which fails without the fix), and also tested
that getting tokens still works in a real cluster.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20399 from vanzin/SPARK-23209.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b834446e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b834446e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b834446e

Branch: refs/heads/master
Commit: b834446ec1338349f6d974afd96f677db3e8fd1a
Parents: e30b34f
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Mon Jan 29 16:09:14 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Mon Jan 29 16:09:14 2018 -0600

----------------------------------------------------------------------
 .../security/HadoopDelegationTokenManager.scala | 17 +++++-
 .../HadoopDelegationTokenManagerSuite.scala     | 58 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b834446e/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 116a686..5151df0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -64,9 +64,9 @@ private[spark] class HadoopDelegationTokenManager(
   }
 
   private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
-    val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
-      new HiveDelegationTokenProvider,
-      new HBaseDelegationTokenProvider)
+    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
+      safeCreateProvider(new HiveDelegationTokenProvider) ++
+      safeCreateProvider(new HBaseDelegationTokenProvider)
 
     // Filter out providers for which spark.security.credentials.{service}.enabled is false.
     providers
@@ -75,6 +75,17 @@ private[spark] class HadoopDelegationTokenManager(
       .toMap
   }
 
+  private def safeCreateProvider(
+      createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
+    try {
+      Some(createFn)
+    } catch {
+      case t: Throwable =>
+        logDebug(s"Failed to load built in provider.", t)
+        None
+    }
+  }
+
   def isServiceEnabled(serviceName: String): Boolean = {
     val key = providerEnabledConfig.format(serviceName)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b834446e/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index eeffc36..2849a10 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.security
 
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.security.Credentials
@@ -110,7 +111,64 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
     creds.getAllTokens.size should be (0)
   }
 
+  test("SPARK-23209: obtain tokens when Hive classes are not available") {
+    // This test needs a custom class loader to hide Hive classes which are in the classpath.
+    // Because the manager code loads the Hive provider directly instead of using reflection, we
+    // need to drive the test through the custom class loader so a new copy that cannot find
+    // Hive classes is loaded.
+    val currentLoader = Thread.currentThread().getContextClassLoader()
+    val noHive = new ClassLoader() {
+      override def loadClass(name: String, resolve: Boolean): Class[_] = {
+        if (name.startsWith("org.apache.hive") || name.startsWith("org.apache.hadoop.hive")) {
+          throw new ClassNotFoundException(name)
+        }
+
+        if (name.startsWith("java") || name.startsWith("scala")) {
+          currentLoader.loadClass(name)
+        } else {
+          val classFileName = name.replaceAll("\\.", "/") + ".class"
+          val in = currentLoader.getResourceAsStream(classFileName)
+          if (in != null) {
+            val bytes = IOUtils.toByteArray(in)
+            defineClass(name, bytes, 0, bytes.length)
+          } else {
+            throw new ClassNotFoundException(name)
+          }
+        }
+      }
+    }
+
+    try {
+      Thread.currentThread().setContextClassLoader(noHive)
+      val test = noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$"))
+      test.getMethod("runTest").invoke(null)
+    } finally {
+      Thread.currentThread().setContextClassLoader(currentLoader)
+    }
+  }
+
   private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = {
     Set(FileSystem.get(hadoopConf))
   }
 }
+
+/** Test code for SPARK-23209 to avoid using too much reflection above. */
+private object NoHiveTest extends Matchers {
+
+  def runTest(): Unit = {
+    try {
+      val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration(),
+        _ => Set())
+      manager.getServiceDelegationTokenProvider("hive") should be (None)
+    } catch {
+      case e: Throwable =>
+        // Throw a better exception in case the test fails, since there may be a lot of nesting.
+        var cause = e
+        while (cause.getCause() != null) {
+          cause = cause.getCause()
+        }
+        throw cause
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org