You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/01/12 20:47:04 UTC

[GitHub] [incubator-gobblin] jhsenjaliya commented on a change in pull request #3157: GOBBLIN-1308 ability to manage token for remote cluster

jhsenjaliya commented on a change in pull request #3157:
URL: https://github.com/apache/incubator-gobblin/pull/3157#discussion_r556081409



##########
File path: gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
##########
@@ -280,84 +314,122 @@ private static void getJhToken(Configuration conf, Credentials cred) throws IOEx
     }
 
     if (jhToken == null) {
-      LOG.error("getDelegationTokenFromHS() returned null");
+      log.error("getDelegationTokenFromHS() returned null");
       throw new IOException("Unable to fetch JH token.");
     }
 
-    LOG.info("Created JH token: " + jhToken.toString());
-    LOG.info("Token kind: " + jhToken.getKind());
-    LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
-    LOG.info("Token service: " + jhToken.getService());
+    log.info("Created JH token: " + jhToken.toString());
+    log.info("Token kind: " + jhToken.getKind());
+    log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+    log.info("Token service: " + jhToken.getService());
 
     cred.addToken(jhToken.getService(), jhToken);
   }
 
-  private static void getFsAndJtTokens(final State state, final Configuration conf, final Optional<String> userToProxy,
-      final Credentials cred) throws IOException, InterruptedException {
+  private static void getJtTokens(final Configuration conf, final Credentials cred, final Optional<String> userToProxy,
+      final State state) throws IOException, InterruptedException {
 
     if (userToProxy.isPresent()) {
       UserGroupInformation.createProxyUser(userToProxy.get(), UserGroupInformation.getLoginUser())
           .doAs(new PrivilegedExceptionAction<Void>() {
             @Override
             public Void run() throws Exception {
-              getFsAndJtTokensImpl(state, conf, cred);
+              getJtTokensImpl(state, conf, cred);
               return null;
             }
           });
     } else {
-      getFsAndJtTokensImpl(state, conf, cred);
+      getJtTokensImpl(state, conf, cred);
     }
   }
 
-  private static void getFsAndJtTokensImpl(final State state, final Configuration conf, final Credentials cred)
+  private static void getJtTokensImpl(final State state, final Configuration conf, final Credentials cred)
       throws IOException {
-    getHdfsToken(conf, cred);
-    if (state.contains(OTHER_NAMENODES)) {
-      getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
-    }
     getJtToken(cred);
   }
 
-  private static void getHdfsToken(Configuration conf, Credentials cred) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    LOG.info("Getting DFS token from " + fs.getUri());
-    String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
-    Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
-    for(int i = 0; i < fsTokens.length; i++) {
-      Token<?> token = fsTokens[i];
-      String message =
-          String.format("DFS token fetched from namenode, token kind: %s, token service %s", token.getKind(),
-              token.getService());
-      LOG.info(message);
+  public static void getAllFSTokens(final Configuration conf, final Credentials cred, final String renewer,
+                                    final Optional<String> userToProxy, final List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+    if (userToProxy.isPresent()) {
+      UserGroupInformation.createProxyUser(userToProxy.get(), UserGroupInformation.getLoginUser())
+              .doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+                  return null;
+                }
+              });
+    } else {
+      getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+    }
+  }
+
+  public static void getAllFSTokensImpl(Configuration conf, Credentials cred, String renewer, List<String> remoteFSURIList) {
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      if (StringUtils.isEmpty(renewer)) {
+        renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+        log.info("No renewer specified for FS: {}, taking default renewer: {}",  fs.getUri(), renewer);
+      }
+
+      log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " + renewer);
+      Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+      if (fsTokens != null) {
+        for (Token<?> token : fsTokens) {
+          log.info("FS Uri: " + fs.getUri() + " token: " + token);
+        }
+      }
+
+      // Handle remote namenodes if any
+      if(remoteFSURIList !=null && remoteFSURIList.size() >0){
+        getRemoteFSTokenFromURI(conf, cred, remoteFSURIList, renewer);
+      }
+
+      log.debug("All credential tokens: " + cred.getAllTokens());
+    } catch (IOException e) {
+      log.error("Error getting or creating HDFS token with renewer: "+ renewer);
     }
+
   }
 
-  private static void getOtherNamenodesToken(List<String> otherNamenodes, Configuration conf, Credentials cred)
+  public static void getRemoteFSTokenFromURI(Configuration conf, Credentials cred, List<String> otherNamenodes, String renewer)
       throws IOException {
-    LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
+    log.debug("Getting tokens for other namenodes: " + otherNamenodes);
     Path[] ps = new Path[otherNamenodes.size()];
     for (int i = 0; i < ps.length; i++) {
       ps[i] = new Path(otherNamenodes.get(i).trim());
+      FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+      if (StringUtils.isEmpty(renewer)) {

Review comment:
       relooking at this logic, earlier there were no way to specify renewer, now that I have added way to specify renewer, we have to fallback to default implementation ( which is to take renewer = `getMRTokenRenewerInternal(new JobConf()).toString();` ) so it will fall into original implementation if user does not specify renewer. I added an overloaded method `getAllFSTokens` that does not take renewer at all to make it look as earlier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org