You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2018/12/12 18:40:45 UTC
twill git commit: TWILL-262 YarnUtils#cloneHaNnCredentials uses
DFSUtil#getHaNnRpcAddresses, which is removed from DFSUtils from hadoop-2.8
Repository: twill
Updated Branches:
refs/heads/master a91ecd403 -> 6d1490ba2
TWILL-262 YarnUtils#cloneHaNnCredentials uses DFSUtil#getHaNnRpcAddresses, which is removed from DFSUtils from hadoop-2.8
This closes #71 on Github
Signed-off-by: Terence Yim <te...@google.com>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6d1490ba
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6d1490ba
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6d1490ba
Branch: refs/heads/master
Commit: 6d1490ba221a7b2bd6b56f64ef50c9539e0e4096
Parents: a91ecd4
Author: lihongyuan <li...@cmss.chinamobile.com>
Authored: Sat Sep 29 14:49:47 2018 +0800
Committer: Terence Yim <te...@google.com>
Committed: Wed Dec 12 10:39:09 2018 -0800
----------------------------------------------------------------------
.../apache/twill/internal/yarn/YarnUtils.java | 77 +++++++++++++++++++-
1 file changed, 74 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/6d1490ba/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index d7e6eb0..3a2f4a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -51,16 +51,21 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+
/**
* Collection of helper methods to simplify YARN calls.
*/
public class YarnUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
+
+
/**
* Defines different versions of Hadoop.
*/
@@ -72,7 +77,32 @@ public class YarnUtils {
HADOOP_26
}
- private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
+ private static boolean hasDFSUtilClient = false; // use this to judge if the hadoop version is above 2.8
+
+ private static boolean hasHAUtilsClient = false;
+
+ private static Method getHaNnRpcAddressesMethod;
+
+ private static Method cloneDelegationTokenForLogicalUriMethod;
+
+ static {
+ try {
+ Class dfsUtilsClientClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtilClient");
+ getHaNnRpcAddressesMethod = dfsUtilsClientClazz.getMethod("getHaNnRpcAddresses",
+ Configuration.class);
+ hasDFSUtilClient = true;
+ Class haUtilClientClazz = Class.forName("org.apache.hadoop.hdfs.HAUtilClient");
+ cloneDelegationTokenForLogicalUriMethod = haUtilClientClazz.getMethod(
+ "cloneDelegationTokenForLogicalUri", UserGroupInformation.class,
+ URI.class, Collection.class);
+ hasHAUtilsClient = true;
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No such class", e);
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No such method", e);
+ }
+ }
+
private static final AtomicReference<HadoopVersions> HADOOP_VERSION = new AtomicReference<>();
public static YarnLocalResource createLocalResource(LocalFile localFile) {
@@ -185,7 +215,7 @@ public class YarnUtils {
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme();
// Loop through all name services. Each name service could have multiple name node associated with it.
- for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) {
+ for (Map.Entry<String, Map<String, InetSocketAddress>> entry : getHaNnRpcAddresses(config).entrySet()) {
String nsId = entry.getKey();
Map<String, InetSocketAddress> addressesInNN = entry.getValue();
if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) {
@@ -198,7 +228,47 @@ public class YarnUtils {
URI uri = URI.create(scheme + "://" + nsId);
LOG.info("Cloning delegation token for uri {}", uri);
- HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+ cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+ }
+ }
+
+ /**
+ * When hadoop_version > 2.8.0, class HAUtil has no method cloneDelegationTokenForLogicalUri(Configuration config)
+ *
+ */
+ private static void cloneDelegationTokenForLogicalUri(UserGroupInformation ugi, URI haUri,
+ Collection<InetSocketAddress> nnAddrs) {
+ if (hasHAUtilsClient) {
+ invokeStaticMethodWithExceptionHandled(cloneDelegationTokenForLogicalUriMethod, ugi, haUri, nnAddrs);
+ } else {
+ HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
+ }
+ }
+
+
+ /**
+ * When hadoop_version > 2.8.0, class DFSUtils has no method getHaNnRpcAddresses(Configuration config)
+ * @param config
+ * @return
+ */
+ private static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(Configuration config) {
+ return hasDFSUtilClient ? getHaNnRpcAddressesUseDFSUtilClient(config) :
+ DFSUtil.getHaNnRpcAddresses(config);
+ }
+
+ private static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddressesUseDFSUtilClient(Configuration config) {
+ return (Map) invokeStaticMethodWithExceptionHandled(getHaNnRpcAddressesMethod, config);
+ }
+
+ private static Object invokeStaticMethodWithExceptionHandled(Method method, Object ... args) {
+ Preconditions.checkNotNull(method);
+ try {
+ return method.invoke(null, args);
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(e.getMessage(), e);
+ }
+ throw Throwables.propagate(e);
}
}
@@ -345,6 +415,7 @@ public class YarnUtils {
return null;
}
+
private YarnUtils() {
}
}