You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zx...@apache.org on 2016/03/07 04:46:52 UTC
hadoop git commit: YARN-4761. NMs reconnecting with changed
capabilities can lead to wrong cluster resource calculations on fair
scheduler. Contributed by Sangjin Lee
Repository: hadoop
Updated Branches:
refs/heads/trunk 19ee18590 -> e1ccc9622
YARN-4761. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations on fair scheduler. Contributed by Sangjin Lee
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e1ccc962
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e1ccc962
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e1ccc962
Branch: refs/heads/trunk
Commit: e1ccc9622b2f1fbefea1862fa74d1fb56d8eb264
Parents: 19ee185
Author: Zhihai Xu <zx...@apache.org>
Authored: Sun Mar 6 19:46:09 2016 -0800
Committer: Zhihai Xu <zx...@apache.org>
Committed: Sun Mar 6 19:46:09 2016 -0800
----------------------------------------------------------------------
.../scheduler/fair/FairScheduler.java | 4 +-
.../scheduler/TestAbstractYarnScheduler.java | 132 +++++++++++++++++++
.../capacity/TestCapacityScheduler.java | 110 ----------------
3 files changed, 134 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1ccc962/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 2801bee..917fc8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -889,7 +889,7 @@ public class FairScheduler extends
} else {
nodesPerRack.put(rackName, 1);
}
- Resources.addTo(clusterResource, node.getTotalCapability());
+ Resources.addTo(clusterResource, schedulerNode.getTotalResource());
updateMaximumAllocation(schedulerNode, true);
triggerUpdate();
@@ -909,7 +909,7 @@ public class FairScheduler extends
if (node == null) {
return;
}
- Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
+ Resources.subtractFrom(clusterResource, node.getTotalResource());
updateRootQueueMetrics();
triggerUpdate();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1ccc962/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index 8411a4d..e7ba58d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -38,11 +40,24 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,12 +66,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
@SuppressWarnings("unchecked")
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
@@ -615,4 +637,114 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
schedulerMaximumResourceCapability.getVirtualCores());
}
+
+ private class SleepHandler implements EventHandler<SchedulerEvent> {
+ boolean sleepFlag = false;
+ int sleepTime = 20;
+ @Override
+ public void handle(SchedulerEvent event) {
+ try {
+ if (sleepFlag) {
+ Thread.sleep(sleepTime);
+ }
+ } catch(InterruptedException ie) {
+ }
+ }
+ }
+
+ private ResourceTrackerService getPrivateResourceTrackerService(
+ Dispatcher privateDispatcher, ResourceManager rm,
+ SleepHandler sleepHandler) {
+ Configuration conf = getConf();
+
+ RMContext privateContext =
+ new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
+ null, null, null);
+ privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
+
+ privateDispatcher.register(SchedulerEventType.class, sleepHandler);
+ privateDispatcher.register(SchedulerEventType.class,
+ rm.getResourceScheduler());
+ privateDispatcher.register(RMNodeEventType.class,
+ new ResourceManager.NodeEventDispatcher(privateContext));
+ ((Service) privateDispatcher).init(conf);
+ ((Service) privateDispatcher).start();
+ NMLivelinessMonitor nmLivelinessMonitor =
+ new NMLivelinessMonitor(privateDispatcher);
+ nmLivelinessMonitor.init(conf);
+ nmLivelinessMonitor.start();
+ NodesListManager nodesListManager = new NodesListManager(privateContext);
+ nodesListManager.init(conf);
+ RMContainerTokenSecretManager containerTokenSecretManager =
+ new RMContainerTokenSecretManager(conf);
+ containerTokenSecretManager.start();
+ NMTokenSecretManagerInRM nmTokenSecretManager =
+ new NMTokenSecretManagerInRM(conf);
+ nmTokenSecretManager.start();
+ ResourceTrackerService privateResourceTrackerService =
+ new ResourceTrackerService(privateContext, nodesListManager,
+ nmLivelinessMonitor, containerTokenSecretManager,
+ nmTokenSecretManager);
+ privateResourceTrackerService.init(conf);
+ privateResourceTrackerService.start();
+ rm.getResourceScheduler().setRMContext(privateContext);
+ return privateResourceTrackerService;
+ }
+
+ /**
+ * Test the behavior of the scheduler when a node reconnects
+ * with changed capabilities. This test is to catch any race conditions
+ * that might occur due to the use of the RMNode object.
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testNodemanagerReconnect() throws Exception {
+ configureScheduler();
+ Configuration conf = getConf();
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
+ DrainDispatcher privateDispatcher = new DrainDispatcher();
+ SleepHandler sleepHandler = new SleepHandler();
+ ResourceTrackerService privateResourceTrackerService =
+ getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
+
+ // Register node1
+ String hostname1 = "localhost1";
+ Resource capability = BuilderUtils.newResource(4096, 4);
+ RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ RegisterNodeManagerRequest request1 =
+ recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+ NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+ request1.setNodeId(nodeId1);
+ request1.setHttpPort(0);
+ request1.setResource(capability);
+ privateResourceTrackerService.registerNodeManager(request1);
+ privateDispatcher.await();
+ Resource clusterResource =
+ rm.getResourceScheduler().getClusterResource();
+ Assert.assertEquals("Initial cluster resources don't match", capability,
+ clusterResource);
+
+ Resource newCapability = BuilderUtils.newResource(1024, 1);
+ RegisterNodeManagerRequest request2 =
+ recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+ request2.setNodeId(nodeId1);
+ request2.setHttpPort(0);
+ request2.setResource(newCapability);
+ // hold up the disaptcher and register the same node with lower capability
+ sleepHandler.sleepFlag = true;
+ privateResourceTrackerService.registerNodeManager(request2);
+ privateDispatcher.await();
+ Assert.assertEquals("Cluster resources don't match", newCapability,
+ rm.getResourceScheduler().getClusterResource());
+ privateResourceTrackerService.stop();
+ } finally {
+ rm.stop();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1ccc962/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index c8c97e9..b6c005b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -74,7 +73,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -82,7 +80,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
@@ -90,13 +87,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
@@ -116,7 +110,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -136,7 +129,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -3285,108 +3277,6 @@ public class TestCapacityScheduler {
}
}
- private class SleepHandler implements EventHandler<SchedulerEvent> {
- boolean sleepFlag = false;
- int sleepTime = 20;
- @Override
- public void handle(SchedulerEvent event) {
- try {
- if(sleepFlag) {
- Thread.sleep(sleepTime);
- }
- }
- catch(InterruptedException ie) {
- }
- }
- }
-
- private ResourceTrackerService getPrivateResourceTrackerService(
- Dispatcher privateDispatcher, SleepHandler sleepHandler) {
-
- Configuration conf = new Configuration();
- ResourceTrackerService privateResourceTrackerService;
-
- RMContext privateContext =
- new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
- null, null, null);
- privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
-
- privateDispatcher.register(SchedulerEventType.class, sleepHandler);
- privateDispatcher.register(SchedulerEventType.class,
- resourceManager.getResourceScheduler());
- privateDispatcher.register(RMNodeEventType.class,
- new ResourceManager.NodeEventDispatcher(privateContext));
- ((Service) privateDispatcher).init(conf);
- ((Service) privateDispatcher).start();
- NMLivelinessMonitor nmLivelinessMonitor =
- new NMLivelinessMonitor(privateDispatcher);
- nmLivelinessMonitor.init(conf);
- nmLivelinessMonitor.start();
- NodesListManager nodesListManager = new NodesListManager(privateContext);
- nodesListManager.init(conf);
- RMContainerTokenSecretManager containerTokenSecretManager =
- new RMContainerTokenSecretManager(conf);
- containerTokenSecretManager.start();
- NMTokenSecretManagerInRM nmTokenSecretManager =
- new NMTokenSecretManagerInRM(conf);
- nmTokenSecretManager.start();
- privateResourceTrackerService =
- new ResourceTrackerService(privateContext, nodesListManager,
- nmLivelinessMonitor, containerTokenSecretManager,
- nmTokenSecretManager);
- privateResourceTrackerService.init(conf);
- privateResourceTrackerService.start();
- resourceManager.getResourceScheduler().setRMContext(privateContext);
- return privateResourceTrackerService;
- }
-
- /**
- * Test the behaviour of the capacity scheduler when a node reconnects
- * with changed capabilities. This test is to catch any race conditions
- * that might occur due to the use of the RMNode object.
- * @throws Exception
- */
- @Test
- public void testNodemanagerReconnect() throws Exception {
-
- DrainDispatcher privateDispatcher = new DrainDispatcher();
- SleepHandler sleepHandler = new SleepHandler();
- ResourceTrackerService privateResourceTrackerService =
- getPrivateResourceTrackerService(privateDispatcher,
- sleepHandler);
-
- // Register node1
- String hostname1 = "localhost1";
- Resource capability = BuilderUtils.newResource(4096, 4);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
- RegisterNodeManagerRequest request1 =
- recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
- NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
- request1.setNodeId(nodeId1);
- request1.setHttpPort(0);
- request1.setResource(capability);
- privateResourceTrackerService.registerNodeManager(request1);
- privateDispatcher.await();
- Resource clusterResource = resourceManager.getResourceScheduler().getClusterResource();
- Assert.assertEquals("Initial cluster resources don't match", capability,
- clusterResource);
-
- Resource newCapability = BuilderUtils.newResource(1024, 1);
- RegisterNodeManagerRequest request2 =
- recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
- request2.setNodeId(nodeId1);
- request2.setHttpPort(0);
- request2.setResource(newCapability);
- // hold up the disaptcher and register the same node with lower capability
- sleepHandler.sleepFlag = true;
- privateResourceTrackerService.registerNodeManager(request2);
- privateDispatcher.await();
- Assert.assertEquals("Cluster resources don't match", newCapability,
- resourceManager.getResourceScheduler().getClusterResource());
- privateResourceTrackerService.stop();
- }
-
@Test
public void testResourceUpdateDecommissioningNode() throws Exception {
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode