You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/13 22:25:44 UTC
spark git commit: [SPARK-21376][YARN] Fix yarn client token expire
issue when cleaning the staging files in long running scenario
Repository: spark
Updated Branches:
refs/heads/master 5c8edfc4a -> cb8d5cc90
[SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the staging files in long running scenario
## What changes were proposed in this pull request?
This issue happens in long running application with yarn cluster mode, because yarn#client doesn't sync token with AM, so it will always keep the initial token, this token may be expired in the long running scenario, so when yarn#client tries to clean up staging directory after application finished, it will use this expired token and meet token expire issue.
## How was this patch tested?
Manual verification is secure cluster.
Author: jerryshao <ss...@hortonworks.com>
Closes #18617 from jerryshao/SPARK-21376.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb8d5cc9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb8d5cc9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb8d5cc9
Branch: refs/heads/master
Commit: cb8d5cc90ff8d3c991ff33da41b136ab7634f71b
Parents: 5c8edfc
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Jul 13 15:25:38 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Jul 13 15:25:38 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/deploy/yarn/Client.scala | 35 +++++++++++++++-----
1 file changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cb8d5cc9/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7caaa91..a5b0e19 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -192,16 +193,32 @@ private[spark] class Client(
* Cleanup application staging directory.
*/
private def cleanupStagingDir(appId: ApplicationId): Unit = {
- val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
- try {
- val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
- val fs = stagingDirPath.getFileSystem(hadoopConf)
- if (!preserveFiles && fs.delete(stagingDirPath, true)) {
- logInfo(s"Deleted staging directory $stagingDirPath")
+ if (sparkConf.get(PRESERVE_STAGING_FILES)) {
+ return
+ }
+
+ def cleanupStagingDirInternal(): Unit = {
+ val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+ try {
+ val fs = stagingDirPath.getFileSystem(hadoopConf)
+ if (fs.delete(stagingDirPath, true)) {
+ logInfo(s"Deleted staging directory $stagingDirPath")
+ }
+ } catch {
+ case ioe: IOException =>
+ logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
- } catch {
- case ioe: IOException =>
- logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+
+ if (isClusterMode && principal != null && keytab != null) {
+ val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+ newUgi.doAs(new PrivilegedExceptionAction[Unit] {
+ override def run(): Unit = {
+ cleanupStagingDirInternal()
+ }
+ })
+ } else {
+ cleanupStagingDirInternal()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org