You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2018/12/12 18:40:45 UTC

twill git commit: TWILL-262 YarnUtils#cloneHaNnCredentials uses DFSUtil#getHaNnRpcAddresses, which is removed from DFSUtils from hadoop-2.8

Repository: twill
Updated Branches:
  refs/heads/master a91ecd403 -> 6d1490ba2


TWILL-262 YarnUtils#cloneHaNnCredentials uses DFSUtil#getHaNnRpcAddresses, which is removed from DFSUtils from hadoop-2.8

This closes #71 on Github

Signed-off-by: Terence Yim <te...@google.com>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6d1490ba
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6d1490ba
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6d1490ba

Branch: refs/heads/master
Commit: 6d1490ba221a7b2bd6b56f64ef50c9539e0e4096
Parents: a91ecd4
Author: lihongyuan <li...@cmss.chinamobile.com>
Authored: Sat Sep 29 14:49:47 2018 +0800
Committer: Terence Yim <te...@google.com>
Committed: Wed Dec 12 10:39:09 2018 -0800

----------------------------------------------------------------------
 .../apache/twill/internal/yarn/YarnUtils.java   | 77 +++++++++++++++++++-
 1 file changed, 74 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/6d1490ba/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index d7e6eb0..3a2f4a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -51,16 +51,21 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
+
 /**
  * Collection of helper methods to simplify YARN calls.
  */
 public class YarnUtils {
 
+  private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
+
+
   /**
    * Defines different versions of Hadoop.
    */
@@ -72,7 +77,32 @@ public class YarnUtils {
     HADOOP_26
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
+  private static boolean hasDFSUtilClient = false; // use this to judge if the hadoop version is above 2.8
+
+  private static boolean hasHAUtilsClient = false;
+
+  private static Method getHaNnRpcAddressesMethod;
+
+  private static Method cloneDelegationTokenForLogicalUriMethod;
+
+  static {
+    try {
+      Class dfsUtilsClientClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtilClient");
+      getHaNnRpcAddressesMethod = dfsUtilsClientClazz.getMethod("getHaNnRpcAddresses",
+          Configuration.class);
+      hasDFSUtilClient = true;
+      Class haUtilClientClazz = Class.forName("org.apache.hadoop.hdfs.HAUtilClient");
+      cloneDelegationTokenForLogicalUriMethod = haUtilClientClazz.getMethod(
+          "cloneDelegationTokenForLogicalUri", UserGroupInformation.class,
+          URI.class, Collection.class);
+      hasHAUtilsClient = true;
+    } catch (ClassNotFoundException e) {
+      LOG.debug("No such class", e);
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No such method", e);
+    }
+  }
+
   private static final AtomicReference<HadoopVersions> HADOOP_VERSION = new AtomicReference<>();
 
   public static YarnLocalResource createLocalResource(LocalFile localFile) {
@@ -185,7 +215,7 @@ public class YarnUtils {
                                           CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme();
 
     // Loop through all name services. Each name service could have multiple name node associated with it.
-    for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) {
+    for (Map.Entry<String, Map<String, InetSocketAddress>> entry : getHaNnRpcAddresses(config).entrySet()) {
       String nsId = entry.getKey();
       Map<String, InetSocketAddress> addressesInNN = entry.getValue();
       if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) {
@@ -198,7 +228,47 @@ public class YarnUtils {
       URI uri = URI.create(scheme + "://" + nsId);
 
       LOG.info("Cloning delegation token for uri {}", uri);
-      HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+      cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+    }
+  }
+
+  /**
+   * When hadoop_version > 2.8.0, class HAUtil has no method cloneDelegationTokenForLogicalUri(Configuration config)
+   *
+   */
+  private static void cloneDelegationTokenForLogicalUri(UserGroupInformation ugi, URI haUri,
+                                                        Collection<InetSocketAddress> nnAddrs) {
+    if (hasHAUtilsClient) {
+      invokeStaticMethodWithExceptionHandled(cloneDelegationTokenForLogicalUriMethod, ugi, haUri, nnAddrs);
+    } else {
+      HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
+    }
+  }
+
+
+  /**
+   * When hadoop_version > 2.8.0, class DFSUtils has no method getHaNnRpcAddresses(Configuration config)
+   * @param config
+   * @return
+   */
+  private static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(Configuration config) {
+    return hasDFSUtilClient ? getHaNnRpcAddressesUseDFSUtilClient(config) :
+        DFSUtil.getHaNnRpcAddresses(config);
+  }
+
+  private static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddressesUseDFSUtilClient(Configuration config) {
+    return (Map) invokeStaticMethodWithExceptionHandled(getHaNnRpcAddressesMethod, config);
+  }
+
+  private static Object invokeStaticMethodWithExceptionHandled(Method method, Object ... args) {
+    Preconditions.checkNotNull(method);
+    try {
+      return method.invoke(null, args);
+    } catch (Exception e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(e.getMessage(), e);
+      }
+      throw Throwables.propagate(e);
     }
   }
 
@@ -345,6 +415,7 @@ public class YarnUtils {
     return null;
   }
 
+
   private YarnUtils() {
   }
 }