You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/05 00:14:11 UTC
hadoop git commit: YARN-4024. YARN RM should avoid unnecessary
resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda)
Repository: hadoop
Updated Branches:
refs/heads/trunk e1feaf6db -> bcc85e3ba
YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcc85e3b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcc85e3b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcc85e3b
Branch: refs/heads/trunk
Commit: bcc85e3bab78bcacd430eac23141774465b96ef9
Parents: e1feaf6
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Sep 4 15:13:53 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Sep 4 15:13:53 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 5 +
.../src/main/resources/yarn-default.xml | 6 +
.../resourcemanager/NodesListManager.java | 142 ++++++++++++++++++-
.../rmapp/TestNodesListManager.java | 102 +++++++++++++
5 files changed, 255 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 662106b..98cc98f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -813,6 +813,9 @@ Release 2.8.0 - UNRELEASED
YARN-4073. Removed unused ApplicationACLsManager in ContainerManagerImpl constructor.
(Naganarasimha G R via rohithsharmaks)
+ YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
+ (Hong Zhiguo via wangda)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a18ef7c..5e1bab2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -746,6 +746,11 @@ public class YarnConfiguration extends Configuration {
+ "proxy-user-privileges.enabled";
public static final boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
+ /** The expiry interval for node IP caching. -1 disables the caching */
+ public static final String RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = RM_PREFIX
+ + "node-ip-cache.expiry-interval-secs";
+ public static final int DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = -1;
+
/**
* How many diagnostics/failure messages can be saved in RM for
* log aggregation. It also defines the number of diagnostics/failure
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 62ba599..436bfb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -273,6 +273,12 @@
</property>
<property>
+ <description>The expiry interval for node IP caching. -1 disables the caching</description>
+ <name>yarn.resourcemanager.node-ip-cache.expiry-interval-secs</name>
+ <value>-1</value>
+ </property>
+
+ <property>
<description>Number of threads to handle resource tracker calls.</description>
<name>yarn.resourcemanager.resource-tracker.client.thread-count</name>
<value>50</value>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index b9c76fb..abea85e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -24,13 +24,18 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -46,9 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
@SuppressWarnings("unchecked")
-public class NodesListManager extends AbstractService implements
+public class NodesListManager extends CompositeService implements
EventHandler<NodesListManagerEvent> {
private static final Log LOG = LogFactory.getLog(NodesListManager.class);
@@ -63,6 +70,8 @@ public class NodesListManager extends AbstractService implements
private String includesFile;
private String excludesFile;
+ private Resolver resolver;
+
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
this.rmContext = rmContext;
@@ -73,6 +82,16 @@ public class NodesListManager extends AbstractService implements
this.conf = conf;
+ int nodeIpCacheTimeout = conf.getInt(
+ YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS);
+ if (nodeIpCacheTimeout <= 0) {
+ resolver = new DirectResolver();
+ } else {
+ resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout);
+ addIfService(resolver);
+ }
+
// Read the hosts/exclude files to restrict access to the RM
try {
this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
@@ -148,17 +167,129 @@ public class NodesListManager extends AbstractService implements
ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
}
+ @VisibleForTesting
+ public Resolver getResolver() {
+ return resolver;
+ }
+
+ @VisibleForTesting
+ public interface Resolver {
+ // try to resolve hostName to IP address, fallback to hostName if failed
+ String resolve(String hostName);
+ }
+
+ @VisibleForTesting
+ public static class DirectResolver implements Resolver {
+ @Override
+ public String resolve(String hostName) {
+ return NetUtils.normalizeHostName(hostName);
+ }
+ }
+
+ @VisibleForTesting
+ public static class CachedResolver extends AbstractService
+ implements Resolver {
+ private static class CacheEntry {
+ public String ip;
+ public long resolveTime;
+ public CacheEntry(String ip, long resolveTime) {
+ this.ip = ip;
+ this.resolveTime = resolveTime;
+ }
+ }
+ private Map<String, CacheEntry> cache =
+ new ConcurrentHashMap<String, CacheEntry>();
+ private int expiryIntervalMs;
+ private int checkIntervalMs;
+ private final Clock clock;
+ private Timer checkingTimer;
+ private TimerTask expireChecker = new ExpireChecker();
+
+ public CachedResolver(Clock clock, int expiryIntervalSecs) {
+ super("NodesListManager.CachedResolver");
+ this.clock = clock;
+ this.expiryIntervalMs = expiryIntervalSecs * 1000;
+ checkIntervalMs = expiryIntervalMs/3;
+ checkingTimer = new Timer(
+ "Timer-NodesListManager.CachedResolver.ExpireChecker", true);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ checkingTimer.scheduleAtFixedRate(
+ expireChecker, checkIntervalMs, checkIntervalMs);
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ checkingTimer.cancel();
+ super.serviceStop();
+ }
+
+ @VisibleForTesting
+ public void addToCache(String hostName, String ip) {
+ cache.put(hostName, new CacheEntry(ip, clock.getTime()));
+ }
+
+ public void removeFromCache(String hostName) {
+ cache.remove(hostName);
+ }
+
+ private String reload(String hostName) {
+ String ip = NetUtils.normalizeHostName(hostName);
+ addToCache(hostName, ip);
+ return ip;
+ }
+
+ @Override
+ public String resolve(String hostName) {
+ CacheEntry e = cache.get(hostName);
+ if (e != null) {
+ return e.ip;
+ }
+ return reload(hostName);
+ }
+
+ @VisibleForTesting
+ public TimerTask getExpireChecker() {
+ return expireChecker;
+ }
+
+ private class ExpireChecker extends TimerTask {
+ @Override
+ public void run() {
+ long currentTime = clock.getTime();
+ Iterator<Map.Entry<String, CacheEntry>> iterator =
+ cache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, CacheEntry> entry = iterator.next();
+ if (currentTime >
+ entry.getValue().resolveTime +
+ CachedResolver.this.expiryIntervalMs) {
+ iterator.remove();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[" + entry.getKey() + ":" + entry.getValue().ip +
+ "] Expired after " +
+ CachedResolver.this.expiryIntervalMs / 1000 + " secs");
+ }
+ }
+ }
+ }
+ }
+ }
+
public boolean isValidNode(String hostName) {
+ String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
- String ip = NetUtils.normalizeHostName(hostName);
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
}
-
+
/**
* Provides the currently unusable nodes. Copies it into provided collection.
* @param unUsableNodes
@@ -207,6 +338,11 @@ public class NodesListManager extends AbstractService implements
default:
LOG.error("Ignoring invalid eventtype " + event.getType());
}
+ // remove the cache of normalized hostname if enabled
+ if (resolver instanceof CachedResolver) {
+ ((CachedResolver)resolver).removeFromCache(
+ eventNode.getNodeID().getHost());
+ }
}
private void disableHostsFileReader(Exception ex) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
index 5330976..2f57dbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -130,6 +132,106 @@ public class TestNodesListManager {
}
+ @Test
+ public void testCachedResolver() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ ControlledClock clock = new ControlledClock(new SystemClock());
+ clock.setTime(0);
+ final int CACHE_EXPIRY_INTERVAL_SECS = 30;
+ NodesListManager.CachedResolver resolver =
+ new NodesListManager.CachedResolver(clock, CACHE_EXPIRY_INTERVAL_SECS);
+ resolver.init(new YarnConfiguration());
+ resolver.start();
+ resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+ Assert.assertEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+
+ resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+ Assert.assertEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ // test removeFromCache
+ resolver.removeFromCache("testCachedResolverHost1");
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ // test expiry
+ clock.tickMsec(CACHE_EXPIRY_INTERVAL_SECS * 1000 + 1);
+ resolver.getExpireChecker().run();
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertNotEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+ }
+
+ @Test
+ public void testDefaultResolver() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+
+ YarnConfiguration conf = new YarnConfiguration();
+
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ NodesListManager nodesListManager = rm.getNodesListManager();
+
+ NodesListManager.Resolver resolver = nodesListManager.getResolver();
+ Assert.assertTrue("default resolver should be DirectResolver",
+ resolver instanceof NodesListManager.DirectResolver);
+ }
+
+ @Test
+ public void testCachedResolverWithEvent() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30);
+
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ NodesListManager nodesListManager = rm.getNodesListManager();
+ nodesListManager.init(conf);
+ nodesListManager.start();
+
+ NodesListManager.CachedResolver resolver =
+ (NodesListManager.CachedResolver)nodesListManager.getResolver();
+
+ resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+ resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+ Assert.assertEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+ 1, "testCachedResolverHost1", 1234);
+ RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+ 1, "testCachedResolverHost2", 1234);
+
+ nodesListManager.handle(
+ new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+ rmnode1));
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ nodesListManager.handle(
+ new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+ rmnode2));
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertNotEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ }
+
/*
* Create dispatcher object
*/