You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/09 18:45:03 UTC

[03/34] hadoop git commit: YARN-4761. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations on fair scheduler. Contributed by Sangjin Lee

YARN-4761. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations on fair scheduler. Contributed by Sangjin Lee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e1ccc962
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e1ccc962
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e1ccc962

Branch: refs/heads/HDFS-1312
Commit: e1ccc9622b2f1fbefea1862fa74d1fb56d8eb264
Parents: 19ee185
Author: Zhihai Xu <zx...@apache.org>
Authored: Sun Mar 6 19:46:09 2016 -0800
Committer: Zhihai Xu <zx...@apache.org>
Committed: Sun Mar 6 19:46:09 2016 -0800

----------------------------------------------------------------------
 .../scheduler/fair/FairScheduler.java           |   4 +-
 .../scheduler/TestAbstractYarnScheduler.java    | 132 +++++++++++++++++++
 .../capacity/TestCapacityScheduler.java         | 110 ----------------
 3 files changed, 134 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1ccc962/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 2801bee..917fc8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -889,7 +889,7 @@ public class FairScheduler extends
     } else {
       nodesPerRack.put(rackName, 1);
     }
-    Resources.addTo(clusterResource, node.getTotalCapability());
+    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
     updateMaximumAllocation(schedulerNode, true);
 
     triggerUpdate();
@@ -909,7 +909,7 @@ public class FairScheduler extends
     if (node == null) {
       return;
     }
-    Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
+    Resources.subtractFrom(clusterResource, node.getTotalResource());
     updateRootQueueMetrics();
 
     triggerUpdate();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1ccc962/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index 8411a4d..e7ba58d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -38,11 +40,24 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,12 +66,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 @SuppressWarnings("unchecked")
 public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
@@ -615,4 +637,114 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
     Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
         schedulerMaximumResourceCapability.getVirtualCores());
   }
+
+  private class SleepHandler implements EventHandler<SchedulerEvent> {
+    boolean sleepFlag = false;
+    int sleepTime = 20;
+    @Override
+    public void handle(SchedulerEvent event) {
+      try {
+        if (sleepFlag) {
+          Thread.sleep(sleepTime);
+        }
+      } catch(InterruptedException ie) {
+      }
+    }
+  }
+
+  private ResourceTrackerService getPrivateResourceTrackerService(
+      Dispatcher privateDispatcher, ResourceManager rm,
+      SleepHandler sleepHandler) {
+    Configuration conf = getConf();
+
+    RMContext privateContext =
+        new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
+            null, null, null);
+    privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
+
+    privateDispatcher.register(SchedulerEventType.class, sleepHandler);
+    privateDispatcher.register(SchedulerEventType.class,
+        rm.getResourceScheduler());
+    privateDispatcher.register(RMNodeEventType.class,
+        new ResourceManager.NodeEventDispatcher(privateContext));
+    ((Service) privateDispatcher).init(conf);
+    ((Service) privateDispatcher).start();
+    NMLivelinessMonitor nmLivelinessMonitor =
+        new NMLivelinessMonitor(privateDispatcher);
+    nmLivelinessMonitor.init(conf);
+    nmLivelinessMonitor.start();
+    NodesListManager nodesListManager = new NodesListManager(privateContext);
+    nodesListManager.init(conf);
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        new RMContainerTokenSecretManager(conf);
+    containerTokenSecretManager.start();
+    NMTokenSecretManagerInRM nmTokenSecretManager =
+        new NMTokenSecretManagerInRM(conf);
+    nmTokenSecretManager.start();
+    ResourceTrackerService privateResourceTrackerService =
+        new ResourceTrackerService(privateContext, nodesListManager,
+            nmLivelinessMonitor, containerTokenSecretManager,
+            nmTokenSecretManager);
+    privateResourceTrackerService.init(conf);
+    privateResourceTrackerService.start();
+    rm.getResourceScheduler().setRMContext(privateContext);
+    return privateResourceTrackerService;
+  }
+
+  /**
+   * Test the behavior of the scheduler when a node reconnects
+   * with changed capabilities. This test is to catch any race conditions
+   * that might occur due to the use of the RMNode object.
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testNodemanagerReconnect() throws Exception {
+    configureScheduler();
+    Configuration conf = getConf();
+    MockRM rm = new MockRM(conf);
+    try {
+      rm.start();
+
+      conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
+      DrainDispatcher privateDispatcher = new DrainDispatcher();
+      SleepHandler sleepHandler = new SleepHandler();
+      ResourceTrackerService privateResourceTrackerService =
+          getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
+
+      // Register node1
+      String hostname1 = "localhost1";
+      Resource capability = BuilderUtils.newResource(4096, 4);
+      RecordFactory recordFactory =
+          RecordFactoryProvider.getRecordFactory(null);
+
+      RegisterNodeManagerRequest request1 =
+          recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+      NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+      request1.setNodeId(nodeId1);
+      request1.setHttpPort(0);
+      request1.setResource(capability);
+      privateResourceTrackerService.registerNodeManager(request1);
+      privateDispatcher.await();
+      Resource clusterResource =
+          rm.getResourceScheduler().getClusterResource();
+      Assert.assertEquals("Initial cluster resources don't match", capability,
+          clusterResource);
+
+      Resource newCapability = BuilderUtils.newResource(1024, 1);
+      RegisterNodeManagerRequest request2 =
+          recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+      request2.setNodeId(nodeId1);
+      request2.setHttpPort(0);
+      request2.setResource(newCapability);
+      // hold up the disaptcher and register the same node with lower capability
+      sleepHandler.sleepFlag = true;
+      privateResourceTrackerService.registerNodeManager(request2);
+      privateDispatcher.await();
+      Assert.assertEquals("Cluster resources don't match", newCapability,
+          rm.getResourceScheduler().getClusterResource());
+      privateResourceTrackerService.stop();
+    } finally {
+      rm.stop();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1ccc962/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index c8c97e9..b6c005b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -74,7 +73,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -82,7 +80,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
@@ -90,13 +87,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
@@ -116,7 +110,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -136,7 +129,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -3285,108 +3277,6 @@ public class TestCapacityScheduler {
     }
   }
 
-  private class SleepHandler implements EventHandler<SchedulerEvent> {
-    boolean sleepFlag = false;
-    int sleepTime = 20;
-    @Override
-    public void handle(SchedulerEvent event) {
-      try {
-        if(sleepFlag) {
-          Thread.sleep(sleepTime);
-        }
-      }
-      catch(InterruptedException ie) {
-      }
-    }
-  }
-
-  private ResourceTrackerService getPrivateResourceTrackerService(
-      Dispatcher privateDispatcher, SleepHandler sleepHandler) {
-
-    Configuration conf = new Configuration();
-    ResourceTrackerService privateResourceTrackerService;
-
-    RMContext privateContext =
-        new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
-            null, null, null);
-    privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
-
-    privateDispatcher.register(SchedulerEventType.class, sleepHandler);
-    privateDispatcher.register(SchedulerEventType.class,
-        resourceManager.getResourceScheduler());
-    privateDispatcher.register(RMNodeEventType.class,
-        new ResourceManager.NodeEventDispatcher(privateContext));
-    ((Service) privateDispatcher).init(conf);
-    ((Service) privateDispatcher).start();
-    NMLivelinessMonitor nmLivelinessMonitor =
-        new NMLivelinessMonitor(privateDispatcher);
-    nmLivelinessMonitor.init(conf);
-    nmLivelinessMonitor.start();
-    NodesListManager nodesListManager = new NodesListManager(privateContext);
-    nodesListManager.init(conf);
-    RMContainerTokenSecretManager containerTokenSecretManager =
-        new RMContainerTokenSecretManager(conf);
-    containerTokenSecretManager.start();
-    NMTokenSecretManagerInRM nmTokenSecretManager =
-        new NMTokenSecretManagerInRM(conf);
-    nmTokenSecretManager.start();
-    privateResourceTrackerService =
-        new ResourceTrackerService(privateContext, nodesListManager,
-            nmLivelinessMonitor, containerTokenSecretManager,
-            nmTokenSecretManager);
-    privateResourceTrackerService.init(conf);
-    privateResourceTrackerService.start();
-    resourceManager.getResourceScheduler().setRMContext(privateContext);
-    return privateResourceTrackerService;
-  }
-
-  /**
-   * Test the behaviour of the capacity scheduler when a node reconnects
-   * with changed capabilities. This test is to catch any race conditions
-   * that might occur due to the use of the RMNode object.
-   * @throws Exception
-   */
-  @Test
-  public void testNodemanagerReconnect() throws Exception {
-
-    DrainDispatcher privateDispatcher = new DrainDispatcher();
-    SleepHandler sleepHandler = new SleepHandler();
-    ResourceTrackerService privateResourceTrackerService =
-        getPrivateResourceTrackerService(privateDispatcher,
-            sleepHandler);
-
-    // Register node1
-    String hostname1 = "localhost1";
-    Resource capability = BuilderUtils.newResource(4096, 4);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-    RegisterNodeManagerRequest request1 =
-        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
-    request1.setNodeId(nodeId1);
-    request1.setHttpPort(0);
-    request1.setResource(capability);
-    privateResourceTrackerService.registerNodeManager(request1);
-    privateDispatcher.await();
-    Resource clusterResource = resourceManager.getResourceScheduler().getClusterResource();
-    Assert.assertEquals("Initial cluster resources don't match", capability,
-        clusterResource);
-
-    Resource newCapability = BuilderUtils.newResource(1024, 1);
-    RegisterNodeManagerRequest request2 =
-        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    request2.setNodeId(nodeId1);
-    request2.setHttpPort(0);
-    request2.setResource(newCapability);
-    // hold up the disaptcher and register the same node with lower capability
-    sleepHandler.sleepFlag = true;
-    privateResourceTrackerService.registerNodeManager(request2);
-    privateDispatcher.await();
-    Assert.assertEquals("Cluster resources don't match", newCapability,
-        resourceManager.getResourceScheduler().getClusterResource());
-    privateResourceTrackerService.stop();
-  }
-
   @Test
   public void testResourceUpdateDecommissioningNode() throws Exception {
     // Mock the RMNodeResourceUpdate event handler to update SchedulerNode