You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/01 12:19:56 UTC
[5/8] flink git commit: [FLINK-6376] [yarn] Improve inline code
comments related to HDFS delegation token inclusion
[FLINK-6376] [yarn] Improve inline code comments related to HDFS delegation token inclusion
This closes #3776.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e13e192e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e13e192e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e13e192e
Branch: refs/heads/release-1.3
Commit: e13e192e150051e6c0510754afaf6add9a92d6ac
Parents: 073852b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jun 30 15:36:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 1 16:27:32 2017 +0800
----------------------------------------------------------------------
.../runtime/security/modules/HadoopModule.java | 16 +++++++++-------
.../flink/yarn/AbstractYarnClusterDescriptor.java | 2 +-
.../src/main/java/org/apache/flink/yarn/Utils.java | 13 ++++++-------
3 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e13e192e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 8ea3f2c..5c62272 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -60,18 +60,19 @@ public class HadoopModule implements SecurityModule {
// supplement with any available tokens
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
- /*
- * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
- * used in the context of reading the stored tokens from UGI.
- * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
- * loginUser.addCredentials(cred);
- * Notify:If UGI use the keytab for login, do not load HDFS delegation token.
- */
+ // Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
+ // used in the context of reading the stored tokens from UGI.
+ // Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+ // loginUser.addCredentials(cred);
try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
securityConfig.getHadoopConfiguration());
+
+ // if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
+ // the UGI would prefer the delegation token instead, which eventually expires
+ // and does not fallback to using Kerberos tickets
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
@@ -83,6 +84,7 @@ public class HadoopModule implements SecurityModule {
credentials.addToken(id, token);
}
}
+
Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, credentials);
http://git-wip-us.apache.org/repos/asf/flink/blob/e13e192e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 418a0ba..986df45 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -799,7 +799,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
if (UserGroupInformation.isSecurityEnabled()) {
- //set tokens when security is enable
+ // set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, conf);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e13e192e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 3276486..6530b1a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -462,14 +462,13 @@ public final class Utils {
try (DataOutputBuffer dob = new DataOutputBuffer()) {
log.debug("Adding security tokens to Task Executor Container launch Context....");
- /*
- * For taskmanager yarn container context, read the tokens from the jobmanager yarn container local flie.
- * Notify: must read the tokens from the local file, but not from UGI context.Because if UGI is login
- * from Keytab, there is no HDFS degegation token in UGI context.
- */
+
+ // For TaskManager YARN container context, read the tokens from the jobmanager yarn container local flie.
+ // NOTE: must read the tokens from the local file, not from the UGI context, because if UGI is login
+ // using Kerberos keytabs, there is no HDFS delegation token in the UGI context.
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
- File.class, org.apache.hadoop.conf.Configuration.class);
+ Method readTokenStorageFileMethod = Credentials.class.getMethod(
+ "readTokenStorageFile", File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
new SecurityUtils.SecurityConfiguration(flinkConfig).getHadoopConfiguration());
cred.writeTokenStorageToStream(dob);