You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/07/02 08:01:48 UTC
spark git commit: [SPARK-8688] [YARN] Bug fix: disable the cache fs
to gain the HDFS connection.
Repository: spark
Updated Branches:
refs/heads/master 792fcd802 -> 646366b5d
[SPARK-8688] [YARN] Bug fix: disable the cache fs to gain the HDFS connection.
If `fs.hdfs.impl.disable.cache` was `false`(default), `FileSystem` will use the cached `DFSClient` which use old token.
[AMDelegationTokenRenewer](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala#L196)
```scala
val credentials = UserGroupInformation.getCurrentUser.getCredentials
credentials.writeTokenStorageFile(tempTokenPath, discachedConfiguration)
```
Although the `credentials` had the new Token, but it still use the cached client and old token.
So It's better to set the `fs.hdfs.impl.disable.cache` as `true` to avoid token expired.
[Jira](https://issues.apache.org/jira/browse/SPARK-8688)
Author: huangzhaowei <ca...@gmail.com>
Closes #7069 from SaintBacchus/SPARK-8688 and squashes the following commits:
f94cd0b [huangzhaowei] modify function parameter
8fb9eb9 [huangzhaowei] explicit the comment
0cd55c9 [huangzhaowei] Rename function name to be an accurate one
cf776a1 [huangzhaowei] [SPARK-8688][YARN]Bug fix: disable the cache fs to gain the HDFS connection.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/646366b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/646366b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/646366b5
Branch: refs/heads/master
Commit: 646366b5d2f12e42f8e7287672ba29a8c918a17d
Parents: 792fcd8
Author: huangzhaowei <ca...@gmail.com>
Authored: Wed Jul 1 23:01:44 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Jul 1 23:01:44 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/deploy/SparkHadoopUtil.scala | 13 +++++++++++++
.../spark/deploy/yarn/AMDelegationTokenRenewer.scala | 10 ++++++----
.../deploy/yarn/ExecutorDelegationTokenUpdater.scala | 5 ++++-
3 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/646366b5/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 7fa75ac..6d14590 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -334,6 +334,19 @@ class SparkHadoopUtil extends Logging {
* Stop the thread that does the delegation token updates.
*/
private[spark] def stopExecutorDelegationTokenRenewer() {}
+
+ /**
+ * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
+ * This is to prevent the DFSClient from using an old cached token to connect to the NameNode.
+ */
+ private[spark] def getConfBypassingFSCache(
+ hadoopConf: Configuration,
+ scheme: String): Configuration = {
+ val newConf = new Configuration(hadoopConf)
+ val confKey = s"fs.${scheme}.impl.disable.cache"
+ newConf.setBoolean(confKey, true)
+ newConf
+ }
}
object SparkHadoopUtil {
http://git-wip-us.apache.org/repos/asf/spark/blob/646366b5/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index 77af46c..56e4741 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer(
sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
private val numFilesToKeep =
sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+ private val freshHadoopConf =
+ hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
/**
* Schedule a login from the keytab and principal set using the --principal and --keytab
@@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer(
private def cleanupOldFiles(): Unit = {
import scala.concurrent.duration._
try {
- val remoteFs = FileSystem.get(hadoopConf)
+ val remoteFs = FileSystem.get(freshHadoopConf)
val credentialsPath = new Path(credentialsFile)
val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
hadoopUtil.listFilesSorted(
@@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer(
// Get a copy of the credentials
override def run(): Void = {
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
- hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
+ hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
null
}
})
// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
- val remoteFs = FileSystem.get(hadoopConf)
+ val remoteFs = FileSystem.get(freshHadoopConf)
// If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
// was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
// and update the lastCredentialsFileSuffix.
@@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer(
val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
val credentials = UserGroupInformation.getCurrentUser.getCredentials
- credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
+ credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
remoteFs.rename(tempTokenPath, tokenPath)
logInfo("Delegation token file rename complete.")
http://git-wip-us.apache.org/repos/asf/spark/blob/646366b5/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
index 229c2c4..94feb63 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -35,6 +35,9 @@ private[spark] class ExecutorDelegationTokenUpdater(
@volatile private var lastCredentialsFileSuffix = 0
private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+ private val freshHadoopConf =
+ SparkHadoopUtil.get.getConfBypassingFSCache(
+ hadoopConf, new Path(credentialsFile).toUri.getScheme)
private val delegationTokenRenewer =
Executors.newSingleThreadScheduledExecutor(
@@ -49,7 +52,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
def updateCredentialsIfRequired(): Unit = {
try {
val credentialsFilePath = new Path(credentialsFile)
- val remoteFs = FileSystem.get(hadoopConf)
+ val remoteFs = FileSystem.get(freshHadoopConf)
SparkHadoopUtil.get.listFilesSorted(
remoteFs, credentialsFilePath.getParent,
credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org