You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/18 18:44:06 UTC

spark git commit: [SPARK-21411][YARN] Lazily create FS within kerberized UGI to avoid token acquiring failure

Repository: spark
Updated Branches:
  refs/heads/master d3f4a2119 -> cde64add1


[SPARK-21411][YARN] Lazily create FS within kerberized UGI to avoid token acquiring failure

## What changes were proposed in this pull request?

In the current `YARNHadoopDelegationTokenManager`, `FileSystem` to which to get tokens are created out of KDC logged UGI, using these `FileSystem` to get new tokens will lead to exception. The main thing is that Spark code trying to get new tokens from the FS created with token auth-ed UGI, but Hadoop can only grant new tokens in kerberized UGI. To fix this issue, we should lazily create these FileSystem within KDC logged UGI.

## How was this patch tested?

Manual verification in secure cluster.

CC vanzin mgummelt please help to review, thanks!

Author: jerryshao <ss...@hortonworks.com>

Closes #18633 from jerryshao/SPARK-21411.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cde64add
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cde64add
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cde64add

Branch: refs/heads/master
Commit: cde64add18dac712c48de0637f1979f1043e333e
Parents: d3f4a21
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Jul 18 11:44:01 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jul 18 11:44:01 2017 -0700

----------------------------------------------------------------------
 .../spark/deploy/security/HadoopDelegationTokenManager.scala | 2 +-
 .../deploy/security/HadoopFSDelegationTokenProvider.scala    | 7 ++++---
 .../deploy/security/HadoopDelegationTokenManagerSuite.scala  | 8 ++++----
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala     | 2 +-
 .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +-
 .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala   | 2 +-
 .../yarn/security/YARNHadoopDelegationTokenManager.scala     | 2 +-
 .../security/YARNHadoopDelegationTokenManagerSuite.scala     | 2 +-
 8 files changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 89b6f52..01cbfe1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
 private[spark] class HadoopDelegationTokenManager(
     sparkConf: SparkConf,
     hadoopConf: Configuration,
-    fileSystems: Set[FileSystem])
+    fileSystems: Configuration => Set[FileSystem])
   extends Logging {
 
   private val deprecatedProviderEnabledConfigs = List(

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 13157f3..f0ac7f5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 
-private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem])
+private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem])
     extends HadoopDelegationTokenProvider with Logging {
 
   // This tokenRenewalInterval will be set in the first call to obtainDelegationTokens.
@@ -43,13 +43,14 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSyste
       hadoopConf: Configuration,
       creds: Credentials): Option[Long] = {
 
+    val fsToGetTokens = fileSystems(hadoopConf)
     val newCreds = fetchDelegationTokens(
       getTokenRenewer(hadoopConf),
-      fileSystems)
+      fsToGetTokens)
 
     // Get the token renewal interval if it is not set. It will only be called once.
     if (tokenRenewalInterval == null) {
-      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems)
+      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens)
     }
 
     // Get the time of next renewal.

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 335f344..5b05521 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -40,7 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
 
     delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
     delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
@@ -53,7 +53,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
 
     delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
     delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
@@ -66,7 +66,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
 
     delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None)
     delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None)
@@ -77,7 +77,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
     val creds = new Credentials()
 
     // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ce290c3..6ff210a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -270,7 +270,7 @@ private[spark] class ApplicationMaster(
             val credentialManager = new YARNHadoopDelegationTokenManager(
               sparkConf,
               yarnConf,
-              YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
+              conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
 
             val credentialRenewer =
               new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 3a7adb7..d408ca9 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -125,7 +125,7 @@ private[spark] class Client(
   private val credentialManager = new YARNHadoopDelegationTokenManager(
     sparkConf,
     hadoopConf,
-    YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+    conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
 
   def reportLauncherState(state: SparkAppHandle.State): Unit = {
     launcherBackend.setState(state)

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index a687f67..4fef439 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -98,7 +98,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     val credentialManager = new YARNHadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+      conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
     credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager)
     credentialUpdater.start()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
index bbd17c8..163cfb4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util.Utils
 private[yarn] class YARNHadoopDelegationTokenManager(
     sparkConf: SparkConf,
     hadoopConf: Configuration,
-    fileSystems: Set[FileSystem]) extends Logging {
+    fileSystems: Configuration => Set[FileSystem]) extends Logging {
 
   private val delegationTokenManager =
     new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems)

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index 2b226ef..c918998 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers
     credentialManager = new YARNHadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+      conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
 
     credentialManager.credentialProviders.get("yarn-test") should not be (None)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org