You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2015/06/18 23:38:28 UTC

hadoop git commit: YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes after NM is reconnected. Contributed by zhihai xu

Repository: hadoop
Updated Branches:
  refs/heads/trunk 6e0a9f92f -> 5b5bb8dcd


YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
after NM is reconnected. Contributed by zhihai xu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b5bb8dc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b5bb8dc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b5bb8dc

Branch: refs/heads/trunk
Commit: 5b5bb8dcdc888ba1ebc7e4eba0fa0e7e79edda9a
Parents: 6e0a9f9
Author: Xuan <xg...@apache.org>
Authored: Thu Jun 18 14:37:49 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Thu Jun 18 14:37:49 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  8 ++-
 .../resourcetracker/TestNMReconnect.java        | 67 +++++++++++++++++++-
 3 files changed, 74 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b5bb8dc/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f00170e..d89c285 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -539,6 +539,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3824. Fix two minor nits in member variable properties
     of YarnConfiguration. (Ray Chiang via devaraj)
 
+    YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
+    after NM is reconnected. (zhihai xu via xgong)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b5bb8dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 8a810cb..d1e6190 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -597,10 +597,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         if (rmNode.getHttpPort() == newNode.getHttpPort()) {
           // Reset heartbeat ID since node just restarted.
           rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+          if (!rmNode.getTotalCapability().equals(
+              newNode.getTotalCapability())) {
+            rmNode.totalCapability = newNode.getTotalCapability();
+          }
           if (rmNode.getState().equals(NodeState.RUNNING)) {
-            // Only add new node if old state is RUNNING
+            // Only add old node if old state is RUNNING
             rmNode.context.getDispatcher().getEventHandler().handle(
-                new NodeAddedSchedulerEvent(newNode));
+                new NodeAddedSchedulerEvent(rmNode));
           }
         } else {
           // Reconnected node differs, so replace old node and start new node

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b5bb8dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
index d16d551..b525efc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
@@ -25,6 +25,9 @@ import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
@@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
@@ -39,10 +43,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDi
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -51,6 +58,8 @@ public class TestNMReconnect {
       RecordFactoryProvider.getRecordFactory(null);
 
   private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
+  private Dispatcher dispatcher;
+  private RMContextImpl context;
 
   private class TestRMNodeEventDispatcher implements
       EventHandler<RMNodeEvent> {
@@ -68,12 +77,12 @@ public class TestNMReconnect {
   public void setUp() {
     Configuration conf = new Configuration();
     // Dispatcher that processes events inline
-    Dispatcher dispatcher = new InlineDispatcher();
+    dispatcher = new InlineDispatcher();
 
     dispatcher.register(RMNodeEventType.class,
         new TestRMNodeEventDispatcher());
 
-    RMContext context = new RMContextImpl(dispatcher, null,
+    context = new RMContextImpl(dispatcher, null,
         null, null, null, null, null, null, null, null);
     dispatcher.register(SchedulerEventType.class,
         new InlineDispatcher.EmptyEventHandler());
@@ -99,6 +108,11 @@ public class TestNMReconnect {
     resourceTrackerService.start();
   }
 
+  @After
+  public void tearDown() {
+    resourceTrackerService.stop();
+  }
+
   @Test
   public void testReconnect() throws Exception {
     String hostname1 = "localhost1";
@@ -126,4 +140,53 @@ public class TestNMReconnect {
     Assert.assertEquals(RMNodeEventType.RECONNECTED,
         rmNodeEvents.get(0).getType());
   }
+
+  @Test
+  public void testCompareRMNodeAfterReconnect() throws Exception {
+    Configuration yarnConf = new YarnConfiguration();
+    CapacityScheduler scheduler = new CapacityScheduler();
+    scheduler.setConf(yarnConf);
+    ConfigurationProvider configurationProvider =
+        ConfigurationProviderFactory.getConfigurationProvider(yarnConf);
+    configurationProvider.init(yarnConf);
+    context.setConfigurationProvider(configurationProvider);
+    RMNodeLabelsManager nlm = new RMNodeLabelsManager();
+    nlm.init(yarnConf);
+    nlm.start();
+    context.setNodeLabelManager(nlm);
+    scheduler.setRMContext(context);
+    scheduler.init(yarnConf);
+    scheduler.start();
+    dispatcher.register(SchedulerEventType.class, scheduler);
+
+    String hostname1 = "localhost1";
+    Resource capability = BuilderUtils.newResource(4096, 4);
+
+    RegisterNodeManagerRequest request1 = recordFactory
+        .newRecordInstance(RegisterNodeManagerRequest.class);
+    NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+    request1.setNodeId(nodeId1);
+    request1.setHttpPort(0);
+    request1.setResource(capability);
+    resourceTrackerService.registerNodeManager(request1);
+    Assert.assertNotNull(context.getRMNodes().get(nodeId1));
+    // verify Scheduler and RMContext use same RMNode reference.
+    Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
+        context.getRMNodes().get(nodeId1));
+    Assert.assertEquals(context.getRMNodes().get(nodeId1).
+        getTotalCapability(), capability);
+    Resource capability1 = BuilderUtils.newResource(2048, 2);
+    request1.setResource(capability1);
+    resourceTrackerService.registerNodeManager(request1);
+    Assert.assertNotNull(context.getRMNodes().get(nodeId1));
+    // verify Scheduler and RMContext use same RMNode reference
+    // after reconnect.
+    Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
+        context.getRMNodes().get(nodeId1));
+    // verify RMNode's capability is changed.
+    Assert.assertEquals(context.getRMNodes().get(nodeId1).
+        getTotalCapability(), capability1);
+    nlm.stop();
+    scheduler.stop();
+  }
 }