You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/05 00:14:11 UTC

hadoop git commit: YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda)

Repository: hadoop
Updated Branches:
  refs/heads/trunk e1feaf6db -> bcc85e3ba


YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcc85e3b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcc85e3b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcc85e3b

Branch: refs/heads/trunk
Commit: bcc85e3bab78bcacd430eac23141774465b96ef9
Parents: e1feaf6
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Sep 4 15:13:53 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Sep 4 15:13:53 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +
 .../src/main/resources/yarn-default.xml         |   6 +
 .../resourcemanager/NodesListManager.java       | 142 ++++++++++++++++++-
 .../rmapp/TestNodesListManager.java             | 102 +++++++++++++
 5 files changed, 255 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 662106b..98cc98f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -813,6 +813,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4073. Removed unused ApplicationACLsManager in ContainerManagerImpl constructor.
     (Naganarasimha G R via rohithsharmaks)
 
+    YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
+    (Hong Zhiguo via wangda)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a18ef7c..5e1bab2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -746,6 +746,11 @@ public class YarnConfiguration extends Configuration {
       + "proxy-user-privileges.enabled";
   public static final boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
 
+  /** The expiry interval for node IP caching. -1 disables the caching */
+  public static final String RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = RM_PREFIX
+      + "node-ip-cache.expiry-interval-secs";
+  public static final int DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = -1;
+
   /**
    * How many diagnostics/failure messages can be saved in RM for
    * log aggregation. It also defines the number of diagnostics/failure

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 62ba599..436bfb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -273,6 +273,12 @@
   </property>
 
   <property>
+    <description>The expiry interval for node IP caching. -1 disables the caching</description>
+    <name>yarn.resourcemanager.node-ip-cache.expiry-interval-secs</name>
+    <value>-1</value>
+  </property>
+
+  <property>
     <description>Number of threads to handle resource tracker calls.</description>
     <name>yarn.resourcemanager.resource-tracker.client.thread-count</name>
     <value>50</value>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index b9c76fb..abea85e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -24,13 +24,18 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -46,9 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 @SuppressWarnings("unchecked")
-public class NodesListManager extends AbstractService implements
+public class NodesListManager extends CompositeService implements
     EventHandler<NodesListManagerEvent> {
 
   private static final Log LOG = LogFactory.getLog(NodesListManager.class);
@@ -63,6 +70,8 @@ public class NodesListManager extends AbstractService implements
   private String includesFile;
   private String excludesFile;
 
+  private Resolver resolver;
+
   public NodesListManager(RMContext rmContext) {
     super(NodesListManager.class.getName());
     this.rmContext = rmContext;
@@ -73,6 +82,16 @@ public class NodesListManager extends AbstractService implements
 
     this.conf = conf;
 
+    int nodeIpCacheTimeout = conf.getInt(
+        YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS,
+        YarnConfiguration.DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS);
+    if (nodeIpCacheTimeout <= 0) {
+      resolver = new DirectResolver();
+    } else {
+      resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout);
+      addIfService(resolver);
+    }
+
     // Read the hosts/exclude files to restrict access to the RM
     try {
       this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
@@ -148,17 +167,129 @@ public class NodesListManager extends AbstractService implements
     ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
   }
 
+  @VisibleForTesting
+  public Resolver getResolver() {
+    return resolver;
+  }
+
+  @VisibleForTesting
+  public interface Resolver {
+    // try to resolve hostName to IP address, fallback to hostName if failed
+    String resolve(String hostName);
+  }
+
+  @VisibleForTesting
+  public static class DirectResolver implements Resolver {
+    @Override
+    public String resolve(String hostName) {
+      return NetUtils.normalizeHostName(hostName);
+    }
+  }
+
+  @VisibleForTesting
+  public static class CachedResolver extends AbstractService
+      implements Resolver {
+    private static class CacheEntry {
+      public String ip;
+      public long resolveTime;
+      public CacheEntry(String ip, long resolveTime) {
+        this.ip = ip;
+        this.resolveTime = resolveTime;
+      }
+    }
+    private Map<String, CacheEntry> cache =
+        new ConcurrentHashMap<String, CacheEntry>();
+    private int expiryIntervalMs;
+    private int checkIntervalMs;
+    private final Clock clock;
+    private Timer checkingTimer;
+    private TimerTask expireChecker = new ExpireChecker();
+
+    public CachedResolver(Clock clock, int expiryIntervalSecs) {
+      super("NodesListManager.CachedResolver");
+      this.clock = clock;
+      this.expiryIntervalMs = expiryIntervalSecs * 1000;
+      checkIntervalMs = expiryIntervalMs/3;
+      checkingTimer = new Timer(
+          "Timer-NodesListManager.CachedResolver.ExpireChecker", true);
+    }
+
+    @Override
+    protected void serviceStart() throws Exception {
+      checkingTimer.scheduleAtFixedRate(
+          expireChecker, checkIntervalMs, checkIntervalMs);
+      super.serviceStart();
+    }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      checkingTimer.cancel();
+      super.serviceStop();
+    }
+
+    @VisibleForTesting
+    public void addToCache(String hostName, String ip) {
+      cache.put(hostName, new CacheEntry(ip, clock.getTime()));
+    }
+
+    public void removeFromCache(String hostName) {
+      cache.remove(hostName);
+    }
+
+    private String reload(String hostName) {
+      String ip = NetUtils.normalizeHostName(hostName);
+      addToCache(hostName, ip);
+      return ip;
+    }
+
+    @Override
+    public String resolve(String hostName) {
+      CacheEntry e = cache.get(hostName);
+      if (e != null) {
+        return e.ip;
+      }
+      return reload(hostName);
+    }
+
+    @VisibleForTesting
+    public TimerTask getExpireChecker() {
+      return expireChecker;
+    }
+
+    private class ExpireChecker extends TimerTask {
+      @Override
+      public void run() {
+        long currentTime = clock.getTime();
+        Iterator<Map.Entry<String, CacheEntry>> iterator =
+            cache.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Map.Entry<String, CacheEntry> entry = iterator.next();
+          if (currentTime >
+              entry.getValue().resolveTime +
+                  CachedResolver.this.expiryIntervalMs) {
+            iterator.remove();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("[" + entry.getKey() + ":" + entry.getValue().ip +
+                  "] Expired after " +
+                  CachedResolver.this.expiryIntervalMs / 1000 + " secs");
+            }
+          }
+        }
+      }
+    }
+  }
+
   public boolean isValidNode(String hostName) {
+    String ip = resolver.resolve(hostName);
     synchronized (hostsReader) {
       Set<String> hostsList = hostsReader.getHosts();
       Set<String> excludeList = hostsReader.getExcludedHosts();
-      String ip = NetUtils.normalizeHostName(hostName);
       return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
           .contains(ip))
           && !(excludeList.contains(hostName) || excludeList.contains(ip));
     }
   }
-  
+
   /**
    * Provides the currently unusable nodes. Copies it into provided collection.
    * @param unUsableNodes
@@ -207,6 +338,11 @@ public class NodesListManager extends AbstractService implements
     default:
       LOG.error("Ignoring invalid eventtype " + event.getType());
     }
+    // remove the cache of normalized hostname if enabled
+    if (resolver instanceof CachedResolver) {
+      ((CachedResolver)resolver).removeFromCache(
+          eventNode.getNodeID().getHost());
+    }
   }
 
   private void disableHostsFileReader(Exception ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc85e3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
index 5330976..2f57dbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -130,6 +132,106 @@ public class TestNodesListManager {
 
   }
 
+  @Test
+  public void testCachedResolver() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    clock.setTime(0);
+    final int CACHE_EXPIRY_INTERVAL_SECS = 30;
+    NodesListManager.CachedResolver resolver =
+        new NodesListManager.CachedResolver(clock, CACHE_EXPIRY_INTERVAL_SECS);
+    resolver.init(new YarnConfiguration());
+    resolver.start();
+    resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+    Assert.assertEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+
+    resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+    Assert.assertEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    // test removeFromCache
+    resolver.removeFromCache("testCachedResolverHost1");
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    // test expiry
+    clock.tickMsec(CACHE_EXPIRY_INTERVAL_SECS * 1000 + 1);
+    resolver.getExpireChecker().run();
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertNotEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+  }
+
+  @Test
+  public void testDefaultResolver() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+
+    YarnConfiguration conf = new YarnConfiguration();
+
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    NodesListManager nodesListManager = rm.getNodesListManager();
+
+    NodesListManager.Resolver resolver = nodesListManager.getResolver();
+    Assert.assertTrue("default resolver should be DirectResolver",
+        resolver instanceof NodesListManager.DirectResolver);
+  }
+
+  @Test
+  public void testCachedResolverWithEvent() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30);
+
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    NodesListManager nodesListManager = rm.getNodesListManager();
+    nodesListManager.init(conf);
+    nodesListManager.start();
+
+    NodesListManager.CachedResolver resolver =
+        (NodesListManager.CachedResolver)nodesListManager.getResolver();
+
+    resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+    resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+    Assert.assertEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+        1, "testCachedResolverHost1", 1234);
+    RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+        1, "testCachedResolverHost2", 1234);
+
+    nodesListManager.handle(
+        new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+            rmnode1));
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    nodesListManager.handle(
+        new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+            rmnode2));
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertNotEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+  }
+
   /*
    * Create dispatcher object
    */