You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/01/12 20:48:00 UTC

[jira] [Work logged] (GOBBLIN-1308) Gobblin's kerberos token management for remote clusters

     [ https://issues.apache.org/jira/browse/GOBBLIN-1308?focusedWorklogId=535029&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-535029 ]

ASF GitHub Bot logged work on GOBBLIN-1308:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Jan/21 20:47
            Start Date: 12/Jan/21 20:47
    Worklog Time Spent: 10m 
      Work Description: jhsenjaliya commented on a change in pull request #3157:
URL: https://github.com/apache/incubator-gobblin/pull/3157#discussion_r556081409



##########
File path: gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
##########
@@ -280,84 +314,122 @@ private static void getJhToken(Configuration conf, Credentials cred) throws IOEx
     }
 
     if (jhToken == null) {
-      LOG.error("getDelegationTokenFromHS() returned null");
+      log.error("getDelegationTokenFromHS() returned null");
       throw new IOException("Unable to fetch JH token.");
     }
 
-    LOG.info("Created JH token: " + jhToken.toString());
-    LOG.info("Token kind: " + jhToken.getKind());
-    LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
-    LOG.info("Token service: " + jhToken.getService());
+    log.info("Created JH token: " + jhToken.toString());
+    log.info("Token kind: " + jhToken.getKind());
+    log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+    log.info("Token service: " + jhToken.getService());
 
     cred.addToken(jhToken.getService(), jhToken);
   }
 
-  private static void getFsAndJtTokens(final State state, final Configuration conf, final Optional<String> userToProxy,
-      final Credentials cred) throws IOException, InterruptedException {
+  private static void getJtTokens(final Configuration conf, final Credentials cred, final Optional<String> userToProxy,
+      final State state) throws IOException, InterruptedException {
 
     if (userToProxy.isPresent()) {
       UserGroupInformation.createProxyUser(userToProxy.get(), UserGroupInformation.getLoginUser())
           .doAs(new PrivilegedExceptionAction<Void>() {
             @Override
             public Void run() throws Exception {
-              getFsAndJtTokensImpl(state, conf, cred);
+              getJtTokensImpl(state, conf, cred);
               return null;
             }
           });
     } else {
-      getFsAndJtTokensImpl(state, conf, cred);
+      getJtTokensImpl(state, conf, cred);
     }
   }
 
-  private static void getFsAndJtTokensImpl(final State state, final Configuration conf, final Credentials cred)
+  private static void getJtTokensImpl(final State state, final Configuration conf, final Credentials cred)
       throws IOException {
-    getHdfsToken(conf, cred);
-    if (state.contains(OTHER_NAMENODES)) {
-      getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
-    }
     getJtToken(cred);
   }
 
-  private static void getHdfsToken(Configuration conf, Credentials cred) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    LOG.info("Getting DFS token from " + fs.getUri());
-    String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
-    Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
-    for(int i = 0; i < fsTokens.length; i++) {
-      Token<?> token = fsTokens[i];
-      String message =
-          String.format("DFS token fetched from namenode, token kind: %s, token service %s", token.getKind(),
-              token.getService());
-      LOG.info(message);
+  public static void getAllFSTokens(final Configuration conf, final Credentials cred, final String renewer,
+                                    final Optional<String> userToProxy, final List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+    if (userToProxy.isPresent()) {
+      UserGroupInformation.createProxyUser(userToProxy.get(), UserGroupInformation.getLoginUser())
+              .doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+                  return null;
+                }
+              });
+    } else {
+      getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+    }
+  }
+
+  public static void getAllFSTokensImpl(Configuration conf, Credentials cred, String renewer, List<String> remoteFSURIList) {
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      if (StringUtils.isEmpty(renewer)) {
+        renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+        log.info("No renewer specified for FS: {}, taking default renewer: {}",  fs.getUri(), renewer);
+      }
+
+      log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " + renewer);
+      Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+      if (fsTokens != null) {
+        for (Token<?> token : fsTokens) {
+          log.info("FS Uri: " + fs.getUri() + " token: " + token);
+        }
+      }
+
+      // Handle remote namenodes if any
+      if(remoteFSURIList !=null && remoteFSURIList.size() >0){
+        getRemoteFSTokenFromURI(conf, cred, remoteFSURIList, renewer);
+      }
+
+      log.debug("All credential tokens: " + cred.getAllTokens());
+    } catch (IOException e) {
+      log.error("Error getting or creating HDFS token with renewer: "+ renewer);
     }
+
   }
 
-  private static void getOtherNamenodesToken(List<String> otherNamenodes, Configuration conf, Credentials cred)
+  public static void getRemoteFSTokenFromURI(Configuration conf, Credentials cred, List<String> otherNamenodes, String renewer)
       throws IOException {
-    LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
+    log.debug("Getting tokens for other namenodes: " + otherNamenodes);
     Path[] ps = new Path[otherNamenodes.size()];
     for (int i = 0; i < ps.length; i++) {
       ps[i] = new Path(otherNamenodes.get(i).trim());
+      FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+      if (StringUtils.isEmpty(renewer)) {

Review comment:
       relooking at this logic, earlier there were no way to specify renewer, now that I have added way to specify renewer, we have to fallback to default implementation ( which is to take renewer = `getMRTokenRenewerInternal(new JobConf()).toString();` ) so it will fall into original implementation if user does not specify renewer. I added an overloaded method `getAllFSTokens` that does not take renewer at all to make it look as earlier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 535029)
    Time Spent: 2h 50m  (was: 2h 40m)

> Gobblin's kerberos token management for remote clusters
> -------------------------------------------------------
>
>                 Key: GOBBLIN-1308
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1308
>             Project: Apache Gobblin
>          Issue Type: Improvement
>    Affects Versions: 0.15.0
>            Reporter: Jay Sen
>            Priority: Major
>             Fix For: 0.16.0
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Gobblin's hadoop tokens/ key management : 
>  Problem: Gobblin only maintains local cluster tokens when key management is enabled. and does not have capability to manage tokens for remote hadoop cluster. ( based on my conversation with many folks here, the token files can be made available externally. but that would require that external system running on cron or something )
> Solution: add remote cluster token management in Gobblin. where remote clusters key can be managed same way it manages the local clusters keys.
>  
> Config looks like following
> ( Changes the enable.key.management config to key.management.enabled )
>  
> {code:java}
> gobblin.hadoop.key.management {
>  enabled = true
>  remote.clusters = [ ${gobblin_sync_systems.hadoop_cluster1}, ${gobblin_sync_systems.hadoop_cluster2} ]
> }
> // These Gobblin platform configurations can be moved to database for other use-cases, but this layout helps make the platform moduler for each connectors.
> gobblin_sync_systems {
>  hadoop_cluster1 {
>  // if Hadoop config path is specified, the FileSystem will be created based on all the xml config provided here, which has all the required info.
>  hadoop_config_path = "file:///etc/hadoop_cluster1/hadoop/config"
>  // If hadoop config path is not specified, you can still specify the speecific nodes for the specific type of tokens
>  namenode_uri = ["hdfs://nn1.hadoop_cluster1.example.com:8020", "hdfs://nn2.hadoop_cluster1.example.com:8020"]
>  kms_nodes = [ "kms1.hadoop_cluster1.example.com:9292", "kms2.hadoop_cluster1.example.com:9292" ]
>  }
>  hadoop_cluster2 {
>  hadoop_config_path = "file:///etc/hadoop_cluster1/hadoop/config"
>  namenode_uri = ["hdfs://nn1.hadoop_cluster2.example.com:8020", "hdfs://nn2.hadoop_cluster2.example.com:8020"]
>  kms_nodes = [ "kms1.hadoop_cluster2.example.com:9292", "kms2.hadoop_cluster2.example.com:9292" ]
>  }
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)