You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/09 05:09:22 UTC
svn commit: r1601303 [2/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-common/src/main/resources/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-res...
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1601303&r1=1601302&r2=1601303&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Mon Jun 9 03:09:21 2014
@@ -18,16 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -56,11 +57,10 @@ import org.apache.hadoop.yarn.util.resou
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -298,7 +298,10 @@ public class TestFifoScheduler {
FifoScheduler fs = new FifoScheduler();
fs.init(conf);
fs.start();
+ // mock rmContext to avoid NPE.
+ RMContext context = mock(RMContext.class);
fs.reinitialize(conf, null);
+ fs.setRMContext(context);
RMNode n1 =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java?rev=1601303&r1=1601302&r2=1601303&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java Mon Jun 9 03:09:21 2014
@@ -43,10 +43,11 @@ import org.junit.Test;
public class TestMoveApplication {
private ResourceManager resourceManager = null;
private static boolean failMove;
-
+ private Configuration conf;
+
@Before
public void setUp() throws Exception {
- Configuration conf = new YarnConfiguration();
+ conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
FifoSchedulerWithMove.class);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public class TestMoveApplication {
}
}
- @Test (timeout = 5000)
- public void testMoveSuccessful() throws Exception {
- // Submit application
- Application application = new Application("user1", resourceManager);
- ApplicationId appId = application.getApplicationId();
- application.submit();
-
- // Wait for app to be accepted
- RMApp app = resourceManager.rmContext.getRMApps().get(appId);
- while (app.getState() != RMAppState.ACCEPTED) {
- Thread.sleep(100);
- }
-
- ClientRMService clientRMService = resourceManager.getClientRMService();
+ @Test (timeout = 10000)
+ public
+ void testMoveSuccessful() throws Exception {
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ RMApp app = rm1.submitApp(1024);
+ ClientRMService clientRMService = rm1.getClientRMService();
// FIFO scheduler does not support moves
- clientRMService.moveApplicationAcrossQueues(
- MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-
- RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+ clientRMService
+ .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+ .newInstance(app.getApplicationId(), "newqueue"));
+
+ RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
assertEquals("newqueue", rmApp.getQueue());
+ rm1.stop();
}
-
+
@Test
public void testMoveRejectedByPermissions() throws Exception {
failMove = true;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1601303&r1=1601302&r2=1601303&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Jun 9 03:09:21 2014
@@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
- node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
@@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
- node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node2.handle(new RMNodeStartedEvent(null, null));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null));
//Add info to the queue first
node.setNextHeartBeat(false);
@@ -464,7 +465,7 @@ public class TestRMNodeTransitions {
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
}
@@ -495,7 +496,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1601303&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Mon Jun 9 03:09:21 2014
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+@RunWith(value = Parameterized.class)
+public class TestWorkPreservingRMRestart {
+
+ private YarnConfiguration conf;
+ private Class<?> schedulerClass;
+ MockRM rm1 = null;
+ MockRM rm2 = null;
+
+ @Before
+ public void setup() throws UnknownHostException {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ conf = new YarnConfiguration();
+ UserGroupInformation.setConfiguration(conf);
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ }
+
+ @After
+ public void tearDown() {
+ if (rm1 != null) {
+ rm1.stop();
+ }
+ if (rm2 != null) {
+ rm2.stop();
+ }
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getTestParameters() {
+ return Arrays.asList(new Object[][] { { CapacityScheduler.class },
+ { FifoScheduler.class } });
+ }
+
+ public TestWorkPreservingRMRestart(Class<?> schedulerClass) {
+ this.schedulerClass = schedulerClass;
+ }
+
+ // Test common scheduler state including SchedulerAttempt, SchedulerNode,
+ // AppSchedulingInfo can be reconstructed via the container recovery reports
+ // on NM re-registration.
+ // Also test scheduler specific changes: i.e. Queue recovery-
+ // CSQueue/FSQueue/FifoQueue recovery respectively.
+ // Test Strategy: send 3 container recovery reports(AMContainer, running
+ // container, completed container) on NM re-registration, check the states of
+ // SchedulerAttempt, SchedulerNode etc. are updated accordingly.
+ @Test(timeout = 20000)
+ public void testSchedulerRecovery() throws Exception {
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+ conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+
+ int containerMemory = 1024;
+ Resource containerResource = Resource.newInstance(containerMemory, 1);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // clear queue metrics
+ rm1.clearQueueMetrics(app1);
+
+ // Re-start RM
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ // recover app
+ RMApp recoveredApp1 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
+ NMContainerStatus amContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+ ContainerState.RUNNING);
+ NMContainerStatus runningContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+ ContainerState.RUNNING);
+ NMContainerStatus completedContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+ ContainerState.COMPLETE);
+
+ nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+ completedContainer));
+
+ // Wait for RM to settle down on recovering containers;
+ waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+
+ // check RMContainers are re-recreated and the container state is correct.
+ rm2.waitForState(nm1, amContainer.getContainerId(),
+ RMContainerState.RUNNING);
+ rm2.waitForState(nm1, runningContainer.getContainerId(),
+ RMContainerState.RUNNING);
+ rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
+
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm2.getResourceScheduler();
+ SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
+
+ // ********* check scheduler node state.*******
+ // 2 running containers.
+ Resource usedResources = Resources.multiply(containerResource, 2);
+ Resource nmResource =
+ Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+
+ assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
+ assertTrue(schedulerNode1.isValidContainer(runningContainer
+ .getContainerId()));
+ assertFalse(schedulerNode1.isValidContainer(completedContainer
+ .getContainerId()));
+ // 2 launched containers, 1 completed container
+ assertEquals(2, schedulerNode1.getNumContainers());
+
+ assertEquals(Resources.subtract(nmResource, usedResources),
+ schedulerNode1.getAvailableResource());
+ assertEquals(usedResources, schedulerNode1.getUsedResource());
+ Resource availableResources = Resources.subtract(nmResource, usedResources);
+
+ // ***** check queue state based on the underlying scheduler ********
+ Map<ApplicationId, SchedulerApplication> schedulerApps =
+ ((AbstractYarnScheduler) rm2.getResourceScheduler())
+ .getSchedulerApplications();
+ SchedulerApplication schedulerApp =
+ schedulerApps.get(recoveredApp1.getApplicationId());
+
+ if (schedulerClass.equals(CapacityScheduler.class)) {
+ checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
+ } else if (schedulerClass.equals(FifoScheduler.class)) {
+ checkFifoQueue(schedulerApp, usedResources, availableResources);
+ }
+
+ // *********** check scheduler attempt state.********
+ SchedulerApplicationAttempt schedulerAttempt =
+ schedulerApp.getCurrentAppAttempt();
+ assertTrue(schedulerAttempt.getLiveContainers().contains(
+ scheduler.getRMContainer(amContainer.getContainerId())));
+ assertTrue(schedulerAttempt.getLiveContainers().contains(
+ scheduler.getRMContainer(runningContainer.getContainerId())));
+ assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
+ assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
+ // *********** check appSchedulingInfo state ***********
+ assertEquals(4, schedulerAttempt.getNewContainerId());
+ }
+
+ private void checkCSQueue(MockRM rm,
+ SchedulerApplication<SchedulerApplicationAttempt> app,
+ Resource clusterResource, Resource queueResource, Resource usedResource,
+ int numContainers)
+ throws Exception {
+ checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource,
+ numContainers);
+
+ LeafQueue queue = (LeafQueue) app.getQueue();
+ Resource availableResources = Resources.subtract(queueResource, usedResource);
+ // ************* check Queue metrics ************
+ QueueMetrics queueMetrics = queue.getMetrics();
+ asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+ availableResources.getVirtualCores(), usedResource.getMemory(),
+ usedResource.getVirtualCores());
+
+ // ************ check user metrics ***********
+ QueueMetrics userMetrics =
+ queueMetrics.getUserMetrics(app.getUser());
+ asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+ availableResources.getVirtualCores(), usedResource.getMemory(),
+ usedResource.getVirtualCores());
+ }
+
+ private void checkCSLeafQueue(MockRM rm,
+ SchedulerApplication<SchedulerApplicationAttempt> app,
+ Resource clusterResource, Resource queueResource, Resource usedResource,
+ int numContainers) {
+ LeafQueue leafQueue = (LeafQueue) app.getQueue();
+ // assert queue used resources.
+ assertEquals(usedResource, leafQueue.getUsedResources());
+ assertEquals(numContainers, leafQueue.getNumContainers());
+
+ ResourceCalculator calc =
+ ((CapacityScheduler) rm.getResourceScheduler()).getResourceCalculator();
+ float usedCapacity =
+ Resources.divide(calc, clusterResource, usedResource, queueResource);
+ // assert queue used capacity
+ assertEquals(usedCapacity, leafQueue.getUsedCapacity(), 1e-8);
+ float absoluteUsedCapacity =
+ Resources.divide(calc, clusterResource, usedResource, clusterResource);
+ // assert queue absolute capacity
+ assertEquals(absoluteUsedCapacity, leafQueue.getAbsoluteUsedCapacity(),
+ 1e-8);
+ // assert user consumed resources.
+ assertEquals(usedResource, leafQueue.getUser(app.getUser())
+ .getConsumedResources());
+ }
+
+ private void checkFifoQueue(SchedulerApplication schedulerApp,
+ Resource usedResources, Resource availableResources) throws Exception {
+ FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler();
+ // ************ check cluster used Resources ********
+ assertEquals(usedResources, scheduler.getUsedResource());
+
+ // ************ check app headroom ****************
+ SchedulerApplicationAttempt schedulerAttempt =
+ schedulerApp.getCurrentAppAttempt();
+ assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
+ // ************ check queue metrics ****************
+ QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
+ asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+ availableResources.getVirtualCores(), usedResources.getMemory(),
+ usedResources.getVirtualCores());
+ }
+
+ // create 3 container reports for AM
+ public static List<NMContainerStatus>
+ createNMContainerStatusForApp(MockAM am) {
+ List<NMContainerStatus> list =
+ new ArrayList<NMContainerStatus>();
+ NMContainerStatus amContainer =
+ TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 1,
+ ContainerState.RUNNING);
+ NMContainerStatus runningContainer =
+ TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 2,
+ ContainerState.RUNNING);
+ NMContainerStatus completedContainer =
+ TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 3,
+ ContainerState.COMPLETE);
+ list.add(amContainer);
+ list.add(runningContainer);
+ list.add(completedContainer);
+ return list;
+ }
+
+ private static final String R = "Default";
+ private static final String A = "QueueA";
+ private static final String B = "QueueB";
+ private static final String USER_1 = "user1";
+ private static final String USER_2 = "user2";
+
+ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
+ final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
+ conf.setCapacity(Q_R, 100);
+ final String Q_A = Q_R + "." + A;
+ final String Q_B = Q_R + "." + B;
+ conf.setQueues(Q_R, new String[] {A, B});
+ conf.setCapacity(Q_A, 50);
+ conf.setCapacity(Q_B, 50);
+ conf.setDouble(CapacitySchedulerConfiguration
+ .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
+ }
+
+ // Test CS recovery with multi-level queues and multi-users:
+ // 1. setup 2 NMs each with 8GB memory;
+ // 2. setup 2 level queues: Default -> (QueueA, QueueB)
+ // 3. User1 submits 2 apps on QueueA
+ // 4. User2 submits 1 app on QueueB
+ // 5. AM and each container has 1GB memory
+ // 6. Restart RM.
+ // 7. nm1 re-syncs back containers belong to user1
+ // 8. nm2 re-syncs back containers belong to user2.
+ // 9. Assert the parent queue and 2 leaf queues state and the metrics.
+ // 10. Assert each user's consumption inside the queue.
+ @Test (timeout = 30000)
+ public void testCapacitySchedulerRecovery() throws Exception {
+ if (!schedulerClass.equals(CapacityScheduler.class)) {
+ return;
+ }
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+ conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(conf);
+ setupQueueConfiguration(csConf);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(csConf);
+ rm1 = new MockRM(csConf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ MockNM nm2 =
+ new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm2.registerNode();
+ RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
+ MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+ RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
+ MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
+
+ RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // clear queue metrics
+ rm1.clearQueueMetrics(app1_1);
+ rm1.clearQueueMetrics(app1_2);
+ rm1.clearQueueMetrics(app2);
+
+ // Re-start RM
+ rm2 = new MockRM(csConf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ List<NMContainerStatus> am1_1Containers =
+ createNMContainerStatusForApp(am1_1);
+ List<NMContainerStatus> am1_2Containers =
+ createNMContainerStatusForApp(am1_2);
+ am1_1Containers.addAll(am1_2Containers);
+ nm1.registerNode(am1_1Containers);
+
+ List<NMContainerStatus> am2Containers =
+ createNMContainerStatusForApp(am2);
+ nm2.registerNode(am2Containers);
+
+ // Wait for RM to settle down on recovering containers;
+ waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
+ waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
+ waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
+
+ // Calculate each queue's resource usage.
+ Resource containerResource = Resource.newInstance(1024, 1);
+ Resource nmResource =
+ Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+ Resource clusterResource = Resources.multiply(nmResource, 2);
+ Resource q1Resource = Resources.multiply(clusterResource, 0.5);
+ Resource q2Resource = Resources.multiply(clusterResource, 0.5);
+ Resource q1UsedResource = Resources.multiply(containerResource, 4);
+ Resource q2UsedResource = Resources.multiply(containerResource, 2);
+ Resource totalUsedResource = Resources.add(q1UsedResource, q2UsedResource);
+ Resource q1availableResources =
+ Resources.subtract(q1Resource, q1UsedResource);
+ Resource q2availableResources =
+ Resources.subtract(q2Resource, q2UsedResource);
+ Resource totalAvailableResource =
+ Resources.add(q1availableResources, q2availableResources);
+
+ Map<ApplicationId, SchedulerApplication> schedulerApps =
+ ((AbstractYarnScheduler) rm2.getResourceScheduler())
+ .getSchedulerApplications();
+ SchedulerApplication schedulerApp1_1 =
+ schedulerApps.get(app1_1.getApplicationId());
+
+ // assert queue A state.
+ checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
+ q1UsedResource, 4);
+ QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
+ asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4,
+ q1availableResources.getMemory(), q1availableResources.getVirtualCores(),
+ q1UsedResource.getMemory(), q1UsedResource.getVirtualCores());
+
+ // assert queue B state.
+ SchedulerApplication schedulerApp2 =
+ schedulerApps.get(app2.getApplicationId());
+ checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
+ q2UsedResource, 2);
+ QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
+ asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2,
+ q2availableResources.getMemory(), q2availableResources.getVirtualCores(),
+ q2UsedResource.getMemory(), q2UsedResource.getVirtualCores());
+
+ // assert parent queue state.
+ LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
+ ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
+ checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
+ (float) 6 / 16);
+ asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
+ totalAvailableResource.getMemory(),
+ totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
+ totalUsedResource.getVirtualCores());
+ }
+
+ private void checkParentQueue(ParentQueue parentQueue, int numContainers,
+ Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {
+ assertEquals(numContainers, parentQueue.getNumContainers());
+ assertEquals(usedResource, parentQueue.getUsedResources());
+ assertEquals(UsedCapacity, parentQueue.getUsedCapacity(), 1e-8);
+ assertEquals(absoluteUsedCapacity, parentQueue.getAbsoluteUsedCapacity(), 1e-8);
+ }
+
+ // Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler
+ // should not recover the containers that belong to the failed AM.
+ @Test(timeout = 20000)
+ public void testAMfailedBetweenRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ NMContainerStatus amContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+ ContainerState.COMPLETE);
+ NMContainerStatus runningContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+ ContainerState.RUNNING);
+ NMContainerStatus completedContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+ ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+ completedContainer));
+ rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+ // Wait for RM to settle down on recovering containers;
+ Thread.sleep(3000);
+
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm2.getResourceScheduler();
+ // Previous AM failed, The failed AM should once again release the
+ // just-recovered containers.
+ assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
+ assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
+ }
+
+ // Apps already completed before RM restart. Restarted RM scheduler should not
+ // recover containers for completed apps.
+ @Test(timeout = 20000)
+ public void testContainersNotRecoveredForCompletedApps() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
+
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus runningContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+ ContainerState.RUNNING);
+ NMContainerStatus completedContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+ ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(runningContainer, completedContainer));
+ RMApp recoveredApp1 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ assertEquals(RMAppState.FINISHED, recoveredApp1.getState());
+
+ // Wait for RM to settle down on recovering containers;
+ Thread.sleep(3000);
+
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm2.getResourceScheduler();
+
+ // scheduler should not recover containers for finished apps.
+ assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
+ assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
+ }
+
+ private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
+ int appsPending, int appsRunning, int appsCompleted,
+ int allocatedContainers, int availableMB, int availableVirtualCores,
+ int allocatedMB, int allocatedVirtualCores) {
+ assertEquals(appsSubmitted, qm.getAppsSubmitted());
+ assertEquals(appsPending, qm.getAppsPending());
+ assertEquals(appsRunning, qm.getAppsRunning());
+ assertEquals(appsCompleted, qm.getAppsCompleted());
+ assertEquals(allocatedContainers, qm.getAllocatedContainers());
+ assertEquals(availableMB, qm.getAvailableMB());
+ assertEquals(availableVirtualCores, qm.getAvailableVirtualCores());
+ assertEquals(allocatedMB, qm.getAllocatedMB());
+ assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
+ }
+
+ private void waitForNumContainersToRecover(int num, MockRM rm,
+ ApplicationAttemptId attemptId) throws Exception {
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+ SchedulerApplicationAttempt attempt =
+ scheduler.getApplicationAttempt(attemptId);
+ while (attempt == null) {
+ System.out.println("Wait for scheduler attempt " + attemptId
+ + " to be created");
+ Thread.sleep(200);
+ attempt = scheduler.getApplicationAttempt(attemptId);
+ }
+ while (attempt.getLiveContainers().size() < num) {
+ System.out.println("Wait for " + num + " containers to recover.");
+ Thread.sleep(200);
+ }
+ }
+}