You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/18 18:44:06 UTC
spark git commit: [SPARK-21411][YARN] Lazily create FS within
kerberized UGI to avoid token acquiring failure
Repository: spark
Updated Branches:
refs/heads/master d3f4a2119 -> cde64add1
[SPARK-21411][YARN] Lazily create FS within kerberized UGI to avoid token acquiring failure
## What changes were proposed in this pull request?
In the current `YARNHadoopDelegationTokenManager`, `FileSystem` to which to get tokens are created out of KDC logged UGI, using these `FileSystem` to get new tokens will lead to exception. The main thing is that Spark code trying to get new tokens from the FS created with token auth-ed UGI, but Hadoop can only grant new tokens in kerberized UGI. To fix this issue, we should lazily create these FileSystem within KDC logged UGI.
## How was this patch tested?
Manual verification in secure cluster.
CC vanzin mgummelt please help to review, thanks!
Author: jerryshao <ss...@hortonworks.com>
Closes #18633 from jerryshao/SPARK-21411.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cde64add
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cde64add
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cde64add
Branch: refs/heads/master
Commit: cde64add18dac712c48de0637f1979f1043e333e
Parents: d3f4a21
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Jul 18 11:44:01 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jul 18 11:44:01 2017 -0700
----------------------------------------------------------------------
.../spark/deploy/security/HadoopDelegationTokenManager.scala | 2 +-
.../deploy/security/HadoopFSDelegationTokenProvider.scala | 7 ++++---
.../deploy/security/HadoopDelegationTokenManagerSuite.scala | 8 ++++----
.../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +-
.../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +-
.../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +-
.../yarn/security/YARNHadoopDelegationTokenManager.scala | 2 +-
.../security/YARNHadoopDelegationTokenManagerSuite.scala | 2 +-
8 files changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 89b6f52..01cbfe1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
private[spark] class HadoopDelegationTokenManager(
sparkConf: SparkConf,
hadoopConf: Configuration,
- fileSystems: Set[FileSystem])
+ fileSystems: Configuration => Set[FileSystem])
extends Logging {
private val deprecatedProviderEnabledConfigs = List(
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 13157f3..f0ac7f5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
-private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem])
+private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem])
extends HadoopDelegationTokenProvider with Logging {
// This tokenRenewalInterval will be set in the first call to obtainDelegationTokens.
@@ -43,13 +43,14 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSyste
hadoopConf: Configuration,
creds: Credentials): Option[Long] = {
+ val fsToGetTokens = fileSystems(hadoopConf)
val newCreds = fetchDelegationTokens(
getTokenRenewer(hadoopConf),
- fileSystems)
+ fsToGetTokens)
// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
- tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems)
+ tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens)
}
// Get the time of next renewal.
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 335f344..5b05521 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -40,7 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
- hadoopFSsToAccess(hadoopConf))
+ hadoopFSsToAccess)
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
@@ -53,7 +53,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
- hadoopFSsToAccess(hadoopConf))
+ hadoopFSsToAccess)
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
@@ -66,7 +66,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
- hadoopFSsToAccess(hadoopConf))
+ hadoopFSsToAccess)
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None)
@@ -77,7 +77,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
- hadoopFSsToAccess(hadoopConf))
+ hadoopFSsToAccess)
val creds = new Credentials()
// Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ce290c3..6ff210a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -270,7 +270,7 @@ private[spark] class ApplicationMaster(
val credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
yarnConf,
- YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
+ conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
val credentialRenewer =
new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 3a7adb7..d408ca9 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -125,7 +125,7 @@ private[spark] class Client(
private val credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
hadoopConf,
- YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+ conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
def reportLauncherState(state: SparkAppHandle.State): Unit = {
launcherBackend.setState(state)
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index a687f67..4fef439 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -98,7 +98,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
val credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
hadoopConf,
- YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+ conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager)
credentialUpdater.start()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
index bbd17c8..163cfb4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util.Utils
private[yarn] class YARNHadoopDelegationTokenManager(
sparkConf: SparkConf,
hadoopConf: Configuration,
- fileSystems: Set[FileSystem]) extends Logging {
+ fileSystems: Configuration => Set[FileSystem]) extends Logging {
private val delegationTokenManager =
new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems)
http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index 2b226ef..c918998 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers
credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
hadoopConf,
- YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+ conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
credentialManager.credentialProviders.get("yarn-test") should not be (None)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org