You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [20/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Tue Aug 19 23:49:39 2014
@@ -18,11 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -33,18 +36,19 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@@ -56,17 +60,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
public class TestResourceTrackerService {
private final static File TEMP_DIR = new File(System.getProperty(
@@ -487,33 +484,37 @@ public class TestResourceTrackerService
RMApp app = rm.submitApp(1024, true);
// Case 1.1: AppAttemptId is null
- ContainerStatus status = ContainerStatus.newInstance(
- ContainerId.newInstance(ApplicationAttemptId.newInstance(
- app.getApplicationId(), 2), 1),
- ContainerState.COMPLETE, "Dummy Completed", 0);
- rm.getResourceTrackerService().handleContainerStatus(status);
+ NMContainerStatus report =
+ NMContainerStatus.newInstance(
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0, Priority.newInstance(10), 1234);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
verify(handler, never()).handle((Event) any());
// Case 1.2: Master container is null
RMAppAttemptImpl currentAttempt =
(RMAppAttemptImpl) app.getCurrentAppAttempt();
currentAttempt.setMasterContainer(null);
- status = ContainerStatus.newInstance(
- ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
- ContainerState.COMPLETE, "Dummy Completed", 0);
- rm.getResourceTrackerService().handleContainerStatus(status);
+ report = NMContainerStatus.newInstance(
+ ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0, Priority.newInstance(10), 1234);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
verify(handler, never()).handle((Event)any());
// Case 2: Managed AM
app = rm.submitApp(1024);
// Case 2.1: AppAttemptId is null
- status = ContainerStatus.newInstance(
- ContainerId.newInstance(ApplicationAttemptId.newInstance(
- app.getApplicationId(), 2), 1),
- ContainerState.COMPLETE, "Dummy Completed", 0);
+ report = NMContainerStatus.newInstance(
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0, Priority.newInstance(10), 1234);
try {
- rm.getResourceTrackerService().handleContainerStatus(status);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
} catch (Exception e) {
// expected - ignore
}
@@ -523,11 +524,12 @@ public class TestResourceTrackerService
currentAttempt =
(RMAppAttemptImpl) app.getCurrentAppAttempt();
currentAttempt.setMasterContainer(null);
- status = ContainerStatus.newInstance(
- ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
- ContainerState.COMPLETE, "Dummy Completed", 0);
+ report = NMContainerStatus.newInstance(
+ ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0, Priority.newInstance(10), 1234);
try {
- rm.getResourceTrackerService().handleContainerStatus(status);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
} catch (Exception e) {
// expected - ignore
}
@@ -593,7 +595,7 @@ public class TestResourceTrackerService
// reconnect of node with changed capability
nm1 = rm.registerNode("host2:5678", 10240);
dispatcher.await();
- response = nm2.nodeHeartbeat(true);
+ response = nm1.nodeHeartbeat(true);
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java Tue Aug 19 23:49:39 2014
@@ -165,7 +165,7 @@ public class TestRMApplicationHistoryWri
when(container.getAllocatedResource()).thenReturn(
Resource.newInstance(-1, -1));
when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
- when(container.getStartTime()).thenReturn(0L);
+ when(container.getCreationTime()).thenReturn(0L);
when(container.getFinishTime()).thenReturn(1L);
when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
when(container.getLogURL()).thenReturn("test log url");
@@ -281,7 +281,7 @@ public class TestRMApplicationHistoryWri
Assert.assertEquals(Resource.newInstance(-1, -1),
containerHD.getAllocatedResource());
Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority());
- Assert.assertEquals(0L, container.getStartTime());
+ Assert.assertEquals(0L, container.getCreationTime());
writer.containerFinished(container);
for (int i = 0; i < MAX_RETRIES; ++i) {
@@ -420,7 +420,7 @@ public class TestRMApplicationHistoryWri
int waitCount = 0;
int allocatedSize = allocated.size();
while (allocatedSize < request && waitCount++ < 200) {
- Thread.sleep(100);
+ Thread.sleep(300);
allocated =
am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Tue Aug 19 23:49:39 2014
@@ -31,10 +31,13 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -159,6 +162,16 @@ public abstract class MockAsm extends Mo
public YarnApplicationState createApplicationState() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public Set<NodeId> getRanNodes() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public RMAppMetrics getRMAppMetrics() {
+ return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0);
+ }
}
public static RMApp newApplication(int i) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Tue Aug 19 23:49:39 2014
@@ -19,41 +19,47 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-
-import org.junit.Assert;
+import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
import org.junit.Test;
-/**
- * Test to restart the AM on failure.
- *
- */
public class TestAMRestart {
- @Test
+ @Test(timeout = 30000)
public void testAMRestartWithExistingContainers() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -123,9 +129,9 @@ public class TestAMRestart {
ContainerId.newInstance(am1.getApplicationAttemptId(), 6);
nm1.nodeHeartbeat(true);
SchedulerApplicationAttempt schedulerAttempt =
- ((CapacityScheduler) rm1.getResourceScheduler())
+ ((AbstractYarnScheduler) rm1.getResourceScheduler())
.getCurrentAttemptForContainer(containerId6);
- while (schedulerAttempt.getReservedContainers().size() == 0) {
+ while (schedulerAttempt.getReservedContainers().isEmpty()) {
System.out.println("Waiting for container " + containerId6
+ " to be reserved.");
nm1.nodeHeartbeat(true);
@@ -219,7 +225,7 @@ public class TestAMRestart {
// record the scheduler attempt for testing.
SchedulerApplicationAttempt schedulerNewAttempt =
- ((CapacityScheduler) rm1.getResourceScheduler())
+ ((AbstractYarnScheduler) rm1.getResourceScheduler())
.getCurrentAttemptForContainer(containerId2);
// finish this application
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am2);
@@ -245,7 +251,7 @@ public class TestAMRestart {
}
}
- @Test
+ @Test(timeout = 30000)
public void testNMTokensRebindOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
@@ -264,31 +270,36 @@ public class TestAMRestart {
nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
- int NUM_CONTAINERS = 1;
List<Container> containers = new ArrayList<Container>();
// nmTokens keeps track of all the nmTokens issued in the allocate call.
List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
- // am1 allocate 1 container on nm1.
+ // am1 allocate 2 container on nm1.
+ // first container
while (true) {
AllocateResponse response =
- am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
+ am1.allocate("127.0.0.1", 2000, 2,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
containers.addAll(response.getAllocatedContainers());
expectedNMTokens.addAll(response.getNMTokens());
- if (containers.size() == NUM_CONTAINERS) {
+ if (containers.size() == 2) {
break;
}
Thread.sleep(200);
System.out.println("Waiting for container to be allocated.");
}
- // launch the container
+ // launch the container-2
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-
+ // launch the container-3
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
+ ContainerId containerId3 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
+ rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
+
// fail am1
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FAILED);
@@ -308,12 +319,12 @@ public class TestAMRestart {
containers = new ArrayList<Container>();
while (true) {
AllocateResponse allocateResponse =
- am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS,
+ am2.allocate("127.1.1.1", 4000, 1,
new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true);
containers.addAll(allocateResponse.getAllocatedContainers());
expectedNMTokens.addAll(allocateResponse.getNMTokens());
- if (containers.size() == NUM_CONTAINERS) {
+ if (containers.size() == 1) {
break;
}
Thread.sleep(200);
@@ -340,4 +351,237 @@ public class TestAMRestart {
Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
rm1.stop();
}
+
+ // AM container preempted, nm disk failure
+ // should not be counted towards AM max retry count.
+ @Test(timeout = 100000)
+ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ // explicitly set max-am-retry count as 1.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm1.getResourceScheduler();
+ ContainerId amContainer =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+ // Preempt the first attempt;
+ scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+ am1.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ ApplicationState appState =
+ memStore.getState().getApplicationState().get(app1.getApplicationId());
+ // AM should be restarted even though max-am-attempt is 1.
+ MockAM am2 =
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+ // Preempt the second attempt.
+ ContainerId amContainer2 =
+ ContainerId.newInstance(am2.getApplicationAttemptId(), 1);
+ scheduler.killContainer(scheduler.getRMContainer(amContainer2));
+
+ am2.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am3 =
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
+ RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
+
+ // mimic NM disk_failure
+ ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
+ containerStatus.setContainerId(attempt3.getMasterContainer().getId());
+ containerStatus.setDiagnostics("mimic NM disk_failure");
+ containerStatus.setState(ContainerState.COMPLETE);
+ containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED);
+ Map<ApplicationId, List<ContainerStatus>> conts =
+ new HashMap<ApplicationId, List<ContainerStatus>>();
+ conts.put(app1.getApplicationId(),
+ Collections.singletonList(containerStatus));
+ nm1.nodeHeartbeat(conts, true);
+
+ am3.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
+ Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
+ appState.getAttempt(am3.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am4 =
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+ RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // create second NM, and register to rm1
+ MockNM nm2 =
+ new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService());
+ nm2.registerNode();
+ // nm1 heartbeats to report unhealthy
+ // This will mimic ContainerExitStatus.ABORT
+ nm1.nodeHeartbeat(false);
+ am4.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
+ Assert.assertEquals(ContainerExitStatus.ABORTED,
+ appState.getAttempt(am4.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+ // launch next AM in nm2
+ nm2.nodeHeartbeat(true);
+ MockAM am5 =
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
+ RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
+ // fail the AM normally
+ nm2
+ .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am5.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
+
+ // AM should not be restarted.
+ rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+ Assert.assertEquals(5, app1.getAppAttempts().size());
+ rm1.stop();
+ }
+
+ // Test RM restarts after AM container is preempted, new RM should not count
+ // AM preemption failure towards the max-retry-account and should be able to
+ // re-launch the AM.
+ @Test(timeout = 20000)
+ public void testPreemptedAMRestartOnRMRestart() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 1.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm1.getResourceScheduler();
+ ContainerId amContainer =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+
+ // Forcibly preempt the am container;
+ scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+ am1.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // state store has 1 attempt stored.
+ ApplicationState appState =
+ memStore.getState().getApplicationState().get(app1.getApplicationId());
+ Assert.assertEquals(1, appState.getAttemptCount());
+ // attempt stored has the preempted container exit status.
+ Assert.assertEquals(ContainerExitStatus.PREEMPTED,
+ appState.getAttempt(am1.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+ // Restart rm.
+ MockRM rm2 = new MockRM(conf, memStore);
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode();
+ rm2.start();
+
+ // Restarted RM should re-launch the am.
+ MockAM am2 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+ MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
+ RMAppAttempt attempt2 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+ .getCurrentAppAttempt();
+ Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry());
+ Assert.assertEquals(ContainerExitStatus.INVALID,
+ appState.getAttempt(am2.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+ rm1.stop();
+ rm2.stop();
+ }
+
+ // Test regular RM restart/failover, new RM should not count
+ // AM failure towards the max-retry-account and should be able to
+ // re-launch the AM.
+ @Test(timeout = 50000)
+ public void testRMRestartOrFailoverNotCountedForAMFailures()
+ throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 1.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ // AM should be restarted even though max-am-attempt is 1.
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
+
+ // Restart rm.
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ ApplicationState appState =
+ memStore.getState().getApplicationState().get(app1.getApplicationId());
+ // re-register the NM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+ status
+ .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ status.setContainerId(attempt1.getMasterContainer().getId());
+ status.setContainerState(ContainerState.COMPLETE);
+ status.setDiagnostics("");
+ nm1.registerNode(Collections.singletonList(status), null);
+
+ rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED);
+ Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+ appState.getAttempt(am1.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+ // Will automatically start a new AppAttempt in rm2
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am2 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+ MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
+ RMAppAttempt attempt3 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+ .getCurrentAppAttempt();
+ Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
+ Assert.assertEquals(ContainerExitStatus.INVALID,
+ appState.getAttempt(am2.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+
+ rm1.stop();
+ rm2.stop();
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,25 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
@@ -27,12 +46,16 @@ import java.util.Random;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@@ -52,22 +75,13 @@ import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L;
int appAlloc = 0;
+ boolean setAMContainer = false;
+ float setAMResourcePercent = 0.0f;
Random rand = null;
Clock mClock = null;
Configuration conf = null;
@@ -115,6 +129,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -133,6 +148,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C D
{ 100, 10, 40, 20, 30 }, // abs
+ { 100, 100, 100, 100, 100 }, // maxCap
{ 100, 30, 60, 10, 0 }, // used
{ 45, 20, 5, 20, 0 }, // pending
{ 0, 0, 0, 0, 0 }, // reserved
@@ -144,12 +160,33 @@ public class TestProportionalCapacityPre
policy.editSchedule();
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
+
+ @Test
+ public void testMaxCap() {
+ int[][] qData = new int[][]{
+ // / A B C
+ { 100, 40, 40, 20 }, // abs
+ { 100, 100, 45, 100 }, // maxCap
+ { 100, 55, 45, 0 }, // used
+ { 20, 10, 10, 0 }, // pending
+ { 0, 0, 0, 0 }, // reserved
+ { 2, 1, 1, 0 }, // apps
+ { -1, 1, 1, 0 }, // req granularity
+ { 3, 0, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // despite the imbalance, since B is at maxCap, do not correct
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
@Test
public void testPreemptCycle() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -169,6 +206,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -205,6 +243,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 39, 43, 21 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -224,6 +263,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -242,6 +282,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -261,6 +302,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 90, 10, 0 }, // used
{ 80, 10, 20, 50 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -280,6 +322,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
@@ -295,10 +338,54 @@ public class TestProportionalCapacityPre
}
@Test
+ public void testZeroGuar() {
+ int[][] qData = new int[][] {
+ // / A B C D E F
+ { 200, 100, 0, 99, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 170, 80, 60, 20, 90, 90, 0 }, // used
+ { 10, 0, 0, 0, 10, 0, 10 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ { 4, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1 }, // req granularity
+ { 2, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // verify capacity taken from A1, not B1 despite B1 being far over
+ // its absolute guaranteed capacity
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
+ @Test
+ public void testZeroGuarOverCap() {
+ int[][] qData = new int[][] {
+ // / A B C D E F
+ { 200, 100, 0, 99, 0, 100, 100 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 170, 170, 60, 20, 90, 0, 0 }, // used
+ { 85, 50, 30, 10, 10, 20, 20 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ { 4, 3, 1, 1, 1, 1, 1 }, // apps
+ { -1, -1, 1, 1, 1, -1, 1 }, // req granularity
+ { 2, 3, 0, 0, 0, 1, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // we verify both that C has priority on B and D (has it has >0 guarantees)
+ // and that B and D are force to share their over capacity fairly (as they
+ // are both zero-guarantees) hence D sees some of its containers preempted
+ verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ }
+
+
+
+ @Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
// / A B C D E F G H I
- { 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap
{ 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
@@ -351,7 +438,138 @@ public class TestProportionalCapacityPre
assert containers.get(4).equals(rm5);
}
-
+
+ @Test
+ public void testPolicyInitializeAfterSchedulerInitialized() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+
+ @SuppressWarnings("resource")
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+
+ // ProportionalCapacityPreemptionPolicy should be initialized after
+ // CapacityScheduler initialized. We will
+ // 1) find SchedulingMonitor from RMActiveService's service list,
+ // 2) check if ResourceCalculator in policy is null or not.
+ // If it's not null, we can come to a conclusion that policy initialized
+ // after scheduler got initialized
+ for (Service service : rm.getRMActiveService().getServices()) {
+ if (service instanceof SchedulingMonitor) {
+ ProportionalCapacityPreemptionPolicy policy =
+ (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
+ .getSchedulingEditPolicy();
+ assertNotNull(policy.getResourceCalculator());
+ return;
+ }
+ }
+
+ fail("Failed to find SchedulingMonitor service, please check what happened");
+ }
+
+ @Test
+ public void testSkipAMContainer() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 100, 50, 50 }, // abs
+ { 100, 100, 100 }, // maxcap
+ { 100, 100, 0 }, // used
+ { 70, 20, 50 }, // pending
+ { 0, 0, 0 }, // reserved
+ { 5, 4, 1 }, // apps
+ { -1, 1, 1 }, // req granularity
+ { 2, 0, 0 }, // subqueues
+ };
+ setAMContainer = true;
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+
+ // By skipping AM Container, all other 24 containers of appD will be
+ // preempted
+ verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+ // By skipping AM Container, all other 24 containers of appC will be
+ // preempted
+ verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // Since AM containers of appC and appD are saved, 2 containers from appB
+ // has to be preempted.
+ verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+ setAMContainer = false;
+ }
+
+ @Test
+ public void testPreemptSkippedAMContainers() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 100, 10, 90 }, // abs
+ { 100, 100, 100 }, // maxcap
+ { 100, 100, 0 }, // used
+ { 70, 20, 90 }, // pending
+ { 0, 0, 0 }, // reserved
+ { 5, 4, 1 }, // apps
+ { -1, 5, 5 }, // req granularity
+ { 2, 0, 0 }, // subqueues
+ };
+ setAMContainer = true;
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+
+ // All 5 containers of appD will be preempted including AM container.
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+ // All 5 containers of appC will be preempted including AM container.
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // By skipping AM Container, all other 4 containers of appB will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+ // By skipping AM Container, all other 4 containers of appA will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ setAMContainer = false;
+ }
+
+ @Test
+ public void testAMResourcePercentForSkippedAMContainers() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 100, 10, 90 }, // abs
+ { 100, 100, 100 }, // maxcap
+ { 100, 100, 0 }, // used
+ { 70, 20, 90 }, // pending
+ { 0, 0, 0 }, // reserved
+ { 5, 4, 1 }, // apps
+ { -1, 5, 5 }, // req granularity
+ { 2, 0, 0 }, // subqueues
+ };
+ setAMContainer = true;
+ setAMResourcePercent = 0.5f;
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+
+ // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb.
+ // Total used AM container size is 20GB, hence 2 AM container has
+ // to be preempted as Queue Capacity is 10Gb.
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+ // Including AM Container, all other 4 containers of appC will be
+ // preempted
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // By skipping AM Container, all other 4 containers of appB will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+ // By skipping AM Container, all other 4 containers of appA will be
+ // preempted
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ setAMContainer = false;
+ }
+
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
@@ -382,24 +600,25 @@ public class TestProportionalCapacityPre
when(mCS.getRootQueue()).thenReturn(mRoot);
Resource clusterResources =
- Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
- when(mCS.getClusterResources()).thenReturn(clusterResources);
+ Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
+ when(mCS.getClusterResource()).thenReturn(clusterResources);
return policy;
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
int[] abs = queueData[0];
- int[] used = queueData[1];
- int[] pending = queueData[2];
- int[] reserved = queueData[3];
- int[] apps = queueData[4];
- int[] gran = queueData[5];
- int[] queues = queueData[6];
+ int[] maxCap = queueData[1];
+ int[] used = queueData[2];
+ int[] pending = queueData[3];
+ int[] reserved = queueData[4];
+ int[] apps = queueData[5];
+ int[] gran = queueData[6];
+ int[] queues = queueData[7];
- return mockNested(abs, used, pending, reserved, apps, gran, queues);
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
- ParentQueue mockNested(int[] abs, int[] used,
+ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
float tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
@@ -407,6 +626,8 @@ public class TestProportionalCapacityPre
when(root.getQueueName()).thenReturn("/");
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
+ when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
@@ -420,6 +641,7 @@ public class TestProportionalCapacityPre
when(q.getQueueName()).thenReturn(queueName);
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
+ when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
}
assert 0 == pqs.size();
return root;
@@ -439,7 +661,7 @@ public class TestProportionalCapacityPre
return pq;
}
- LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
+ LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class);
when(lq.getTotalResourcePending()).thenReturn(
@@ -464,6 +686,9 @@ public class TestProportionalCapacityPre
}
}
when(lq.getApplications()).thenReturn(qApps);
+ if(setAMResourcePercent != 0.0f){
+ when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
+ }
p.getChildQueues().add(lq);
return lq;
}
@@ -488,7 +713,11 @@ public class TestProportionalCapacityPre
List<RMContainer> cLive = new ArrayList<RMContainer>();
for (int i = 0; i < used; i += gran) {
- cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+ if(setAMContainer && i == 0){
+ cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
+ }else{
+ cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+ }
++cAlloc;
}
when(app.getLiveContainers()).thenReturn(cLive);
@@ -504,6 +733,10 @@ public class TestProportionalCapacityPre
RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c);
+ when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+ if(0 == priority){
+ when(mC.isAMContainer()).thenReturn(true);
+ }
return mC;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Map;
import javax.crypto.SecretKey;
import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -55,18 +55,21 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class RMStateStoreTestBase extends ClientBaseWithFixes{
@@ -74,10 +77,9 @@ public class RMStateStoreTestBase extend
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
static class TestDispatcher implements
- Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> {
+ Dispatcher, EventHandler<RMAppAttemptEvent> {
ApplicationAttemptId attemptId;
- Exception storedException;
boolean notified = false;
@@ -88,9 +90,8 @@ public class RMStateStoreTestBase extend
}
@Override
- public void handle(RMAppAttemptNewSavedEvent event) {
+ public void handle(RMAppAttemptEvent event) {
assertEquals(attemptId, event.getApplicationAttemptId());
- assertEquals(storedException, event.getStoredException());
notified = true;
synchronized (this) {
notifyAll();
@@ -108,8 +109,8 @@ public class RMStateStoreTestBase extend
interface RMStateStoreHelper {
RMStateStore getRMStateStore() throws Exception;
boolean isFinalStateValid() throws Exception;
- void writeVersion(RMStateVersion version) throws Exception;
- RMStateVersion getCurrentVersion() throws Exception;
+ void writeVersion(Version version) throws Exception;
+ Version getCurrentVersion() throws Exception;
boolean appExists(RMApp app) throws Exception;
}
@@ -160,7 +161,6 @@ public class RMStateStoreTestBase extend
when(mockAttempt.getClientTokenMasterKey())
.thenReturn(clientTokenMasterKey);
dispatcher.attemptId = attemptId;
- dispatcher.storedException = null;
store.storeNewApplicationAttempt(mockAttempt);
waitNotify(dispatcher);
return container.getId();
@@ -175,8 +175,15 @@ public class RMStateStoreTestBase extend
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+
AMRMTokenSecretManager appTokenMgr =
- new AMRMTokenSecretManager(conf);
+ spy(new AMRMTokenSecretManager(conf, rmContext));
+
+ MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+ when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
@@ -188,8 +195,6 @@ public class RMStateStoreTestBase extend
// create application token and client token key for attempt1
Token<AMRMTokenIdentifier> appAttemptToken1 =
generateAMRMToken(attemptId1, appTokenMgr);
- HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
- attemptTokenSet1.add(appAttemptToken1);
SecretKey clientTokenKey1 =
clientToAMTokenMgr.createMasterKey(attemptId1);
@@ -204,8 +209,6 @@ public class RMStateStoreTestBase extend
// create application token and client token key for attempt2
Token<AMRMTokenIdentifier> appAttemptToken2 =
generateAMRMToken(attemptId2, appTokenMgr);
- HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
- attemptTokenSet2.add(appAttemptToken2);
SecretKey clientTokenKey2 =
clientToAMTokenMgr.createMasterKey(attemptId2);
@@ -267,12 +270,9 @@ public class RMStateStoreTestBase extend
// attempt1 is loaded correctly
assertNotNull(attemptState);
assertEquals(attemptId1, attemptState.getAttemptId());
+ assertEquals(-1000, attemptState.getAMContainerExitStatus());
// attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId());
- // attempt1 applicationToken is loaded correctly
- HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
- savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
- assertEquals(attemptTokenSet1, savedTokens);
// attempt1 client token master key is loaded correctly
assertArrayEquals(clientTokenKey1.getEncoded(),
attemptState.getAppAttemptCredentials()
@@ -284,10 +284,6 @@ public class RMStateStoreTestBase extend
assertEquals(attemptId2, attemptState.getAttemptId());
// attempt2 container is loaded correctly
assertEquals(containerId2, attemptState.getMasterContainer().getId());
- // attempt2 applicationToken is loaded correctly
- savedTokens.clear();
- savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
- assertEquals(attemptTokenSet2, savedTokens);
// attempt2 client token master key is loaded correctly
assertArrayEquals(clientTokenKey2.getEncoded(),
attemptState.getAppAttemptCredentials()
@@ -308,7 +304,7 @@ public class RMStateStoreTestBase extend
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
- FinalApplicationStatus.SUCCEEDED);
+ FinalApplicationStatus.SUCCEEDED, 100);
store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
@@ -331,7 +327,7 @@ public class RMStateStoreTestBase extend
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
- FinalApplicationStatus.SUCCEEDED);
+ FinalApplicationStatus.SUCCEEDED, 111);
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down
@@ -370,6 +366,7 @@ public class RMStateStoreTestBase extend
assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
+ assertEquals(100, updatedAttemptState.getAMContainerExitStatus());
assertEquals(FinalApplicationStatus.SUCCEEDED,
updatedAttemptState.getFinalApplicationStatus());
@@ -453,10 +450,8 @@ public class RMStateStoreTestBase extend
private Token<AMRMTokenIdentifier> generateAMRMToken(
ApplicationAttemptId attemptId,
AMRMTokenSecretManager appTokenMgr) {
- AMRMTokenIdentifier appTokenId =
- new AMRMTokenIdentifier(attemptId);
Token<AMRMTokenIdentifier> appToken =
- new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+ appTokenMgr.createAndGetAMRMToken(attemptId);
appToken.setService(new Text("appToken service"));
return appToken;
}
@@ -467,13 +462,13 @@ public class RMStateStoreTestBase extend
store.setRMDispatcher(new TestDispatcher());
// default version
- RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion();
+ Version defaultVersion = stateStoreHelper.getCurrentVersion();
store.checkVersion();
Assert.assertEquals(defaultVersion, store.loadVersion());
// compatible version
- RMStateVersion compatibleVersion =
- RMStateVersion.newInstance(defaultVersion.getMajorVersion(),
+ Version compatibleVersion =
+ Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
stateStoreHelper.writeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, store.loadVersion());
@@ -482,8 +477,8 @@ public class RMStateStoreTestBase extend
Assert.assertEquals(defaultVersion, store.loadVersion());
// incompatible version
- RMStateVersion incompatibleVersion =
- RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2,
+ Version incompatibleVersion =
+ Version.newInstance(defaultVersion.getMajorVersion() + 2,
defaultVersion.getMinorVersion());
stateStoreHelper.writeVersion(incompatibleVersion);
try {
@@ -493,21 +488,53 @@ public class RMStateStoreTestBase extend
Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
}
}
+
+ public void testEpoch(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ store.setRMDispatcher(new TestDispatcher());
+
+ int firstTimeEpoch = store.getAndIncrementEpoch();
+ Assert.assertEquals(0, firstTimeEpoch);
+
+ int secondTimeEpoch = store.getAndIncrementEpoch();
+ Assert.assertEquals(1, secondTimeEpoch);
+
+ int thirdTimeEpoch = store.getAndIncrementEpoch();
+ Assert.assertEquals(2, thirdTimeEpoch);
+ }
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(new TestDispatcher());
- // create and store apps
+ ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
+
+ for (RMApp app : appList) {
+ // remove the app
+ store.removeApplication(app);
+ // wait for app to be removed.
+ while (true) {
+ if (!stateStoreHelper.appExists(app)) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+ }
+
+ private ArrayList<RMApp> createAndStoreApps(
+ RMStateStoreHelper stateStoreHelper, RMStateStore store, int numApps)
+ throws Exception {
ArrayList<RMApp> appList = new ArrayList<RMApp>();
- int NUM_APPS = 5;
- for (int i = 0; i < NUM_APPS; i++) {
+ for (int i = 0; i < numApps; i++) {
ApplicationId appId = ApplicationId.newInstance(1383183338, i);
RMApp app = storeApp(store, appId, 123456789, 987654321);
appList.add(app);
}
- Assert.assertEquals(NUM_APPS, appList.size());
+ Assert.assertEquals(numApps, appList.size());
for (RMApp app : appList) {
// wait for app to be stored.
while (true) {
@@ -518,18 +545,17 @@ public class RMStateStoreTestBase extend
}
}
}
+ return appList;
+ }
+ public void testDeleteStore(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
+ store.deleteStore();
+ // verify apps deleted
for (RMApp app : appList) {
- // remove the app
- store.removeApplication(app);
- // wait for app to be removed.
- while (true) {
- if (!stateStoreHelper.appExists(app)) {
- break;
- } else {
- Thread.sleep(100);
- }
- }
+ Assert.assertFalse(stateStoreHelper.appExists(app));
}
}
@@ -541,4 +567,65 @@ public class RMStateStoreTestBase extend
}
+ public void testAMRMTokenSecretManagerStateStore(
+ RMStateStoreHelper stateStoreHelper) throws Exception {
+ System.out.println("Start testing");
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+ Configuration conf = new YarnConfiguration();
+ AMRMTokenSecretManager appTokenMgr =
+ new AMRMTokenSecretManager(conf, rmContext);
+
+ //create and save the first masterkey
+ MasterKeyData firstMasterKeyData = appTokenMgr.createNewMasterKey();
+
+ AMRMTokenSecretManagerState state1 =
+ AMRMTokenSecretManagerState.newInstance(
+ firstMasterKeyData.getMasterKey(), null);
+ rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state1,
+ false);
+
+ // load state
+ store = stateStoreHelper.getRMStateStore();
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ Assert.assertNotNull(state.getAMRMTokenSecretManagerState());
+ Assert.assertEquals(firstMasterKeyData.getMasterKey(), state
+ .getAMRMTokenSecretManagerState().getCurrentMasterKey());
+ Assert.assertNull(state
+ .getAMRMTokenSecretManagerState().getNextMasterKey());
+
+ //create and save the second masterkey
+ MasterKeyData secondMasterKeyData = appTokenMgr.createNewMasterKey();
+ AMRMTokenSecretManagerState state2 =
+ AMRMTokenSecretManagerState
+ .newInstance(firstMasterKeyData.getMasterKey(),
+ secondMasterKeyData.getMasterKey());
+ rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state2,
+ true);
+
+ // load state
+ store = stateStoreHelper.getRMStateStore();
+ store.setRMDispatcher(dispatcher);
+ RMState state_2 = store.loadState();
+ Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState());
+ Assert.assertEquals(firstMasterKeyData.getMasterKey(), state_2
+ .getAMRMTokenSecretManagerState().getCurrentMasterKey());
+ Assert.assertEquals(secondMasterKeyData.getMasterKey(), state_2
+ .getAMRMTokenSecretManagerState().getNextMasterKey());
+
+ // re-create the masterKeyData based on the recovered masterkey
+ // should have the same secretKey
+ appTokenMgr.recover(state_2);
+ Assert.assertEquals(appTokenMgr.getCurrnetMasterKeyData().getSecretKey(),
+ firstMasterKeyData.getSecretKey());
+ Assert.assertEquals(appTokenMgr.getNextMasterKeyData().getSecretKey(),
+ secondMasterKeyData.getSecretKey());
+
+ store.close();
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -37,9 +36,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -71,7 +70,7 @@ public class TestFSRMStateStore extends
return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
}
- public RMStateVersion getCurrentVersion() {
+ public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@@ -112,13 +111,13 @@ public class TestFSRMStateStore extends
}
@Override
- public void writeVersion(RMStateVersion version) throws Exception {
- store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version)
+ public void writeVersion(Version version) throws Exception {
+ store.updateFile(store.getVersionNode(), ((VersionPBImpl) version)
.getProto().toByteArray());
}
@Override
- public RMStateVersion getCurrentVersion() throws Exception {
+ public Version getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
@@ -158,7 +157,10 @@ public class TestFSRMStateStore extends
.getFileSystem(conf).exists(tempAppAttemptFile));
testRMDTSecretManagerStateStore(fsTester);
testCheckVersion(fsTester);
+ testEpoch(fsTester);
testAppDeletion(fsTester);
+ testDeleteStore(fsTester);
+ testAMRMTokenSecretManagerStateStore(fsTester);
} finally {
cluster.shutdown();
}
@@ -213,9 +215,8 @@ public class TestFSRMStateStore extends
try {
store.storeApplicationStateInternal(
ApplicationId.newInstance(100L, 1),
- (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
- .newApplicationStateData(111, 111, "user", null,
- RMAppState.ACCEPTED, "diagnostics", 333));
+ ApplicationStateData.newInstance(111, 111, "user", null,
+ RMAppState.ACCEPTED, "diagnostics", 333));
} catch (Exception e) {
// TODO 0 datanode exception will not be retried by dfs client, fix
// that separately.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -32,9 +32,9 @@ import org.apache.hadoop.ha.HAServicePro
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
@@ -69,7 +69,7 @@ public class TestZKRMStateStore extends
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
}
- public RMStateVersion getCurrentVersion() {
+ public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@@ -96,13 +96,13 @@ public class TestZKRMStateStore extends
}
@Override
- public void writeVersion(RMStateVersion version) throws Exception {
- client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version)
+ public void writeVersion(Version version) throws Exception {
+ client.setData(store.getVersionNode(), ((VersionPBImpl) version)
.getProto().toByteArray(), -1);
}
@Override
- public RMStateVersion getCurrentVersion() throws Exception {
+ public Version getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
@@ -120,7 +120,10 @@ public class TestZKRMStateStore extends
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
testCheckVersion(zkTester);
+ testEpoch(zkTester);
testAppDeletion(zkTester);
+ testDeleteStore(zkTester);
+ testAMRMTokenSecretManagerStateStore(zkTester);
}
private Configuration createHARMConf(
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Tue Aug 19 23:49:39 2014
@@ -41,6 +41,7 @@ import java.security.NoSuchAlgorithmExce
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -203,7 +204,7 @@ public class TestZKRMStateStoreZKClientC
LOG.error(error, e);
fail(error);
}
- Assert.assertEquals("newBytes", new String(ret));
+ assertEquals("newBytes", new String(ret));
}
@Test(timeout = 20000)
@@ -232,7 +233,7 @@ public class TestZKRMStateStoreZKClientC
try {
byte[] ret = store.getDataWithRetries(path, false);
- Assert.assertEquals("bytes", new String(ret));
+ assertEquals("bytes", new String(ret));
} catch (Exception e) {
String error = "New session creation failed";
LOG.error(error, e);
@@ -281,4 +282,24 @@ public class TestZKRMStateStoreZKClientC
zkClientTester.getRMStateStore(conf);
}
+
+ @Test
+ public void testZKRetryInterval() throws Exception {
+ TestZKClient zkClientTester = new TestZKClient();
+ YarnConfiguration conf = new YarnConfiguration();
+
+ ZKRMStateStore store =
+ (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+ assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
+ store.zkRetryInterval);
+ store.stop();
+
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ store =
+ (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+ assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
+ YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
+ store.zkRetryInterval);
+ store.stop();
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -232,4 +234,18 @@ public class MockRMApp implements RMApp
public YarnApplicationState createApplicationState() {
return null;
}
+
+ @Override
+ public Set<NodeId> getRanNodes() {
+ return null;
+ }
+
+ public Resource getResourcePreempted() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public RMAppMetrics getRMAppMetrics() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Aug 19 23:49:39 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
@@ -59,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -192,7 +192,7 @@ public class TestRMAppTransitions {
this.rmContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
- null, new AMRMTokenSecretManager(conf),
+ null, new AMRMTokenSecretManager(conf, this.rmContext),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(),
@@ -327,15 +327,15 @@ public class TestRMAppTransitions {
private void sendAppUpdateSavedEvent(RMApp application) {
RMAppEvent event =
- new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
application.handle(event);
rmDispatcher.await();
}
private void sendAttemptUpdateSavedEvent(RMApp application) {
application.getCurrentAppAttempt().handle(
- new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
- .getAppAttemptId(), null));
+ new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
}
protected RMApp testCreateAppNewSaving(
@@ -356,7 +356,7 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppNewSaving(submissionContext);
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
RMAppEvent event =
- new RMAppNewSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
@@ -421,7 +421,7 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppFinalSaving(submissionContext);
// FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
RMAppEvent appUpdated =
- new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
application.handle(appUpdated);
assertAppState(RMAppState.FINISHING, application);
assertTimesAtFinish(application);
@@ -762,7 +762,7 @@ public class TestRMAppTransitions {
application.handle(event);
assertAppState(RMAppState.FINAL_SAVING, application);
RMAppEvent appUpdated =
- new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
application.handle(appUpdated);
assertAppState(RMAppState.FINISHED, application);
@@ -921,6 +921,7 @@ public class TestRMAppTransitions {
assertAppState(RMAppState.NEW, app);
ApplicationReport report = app.createAndGetApplicationReport(null, true);
Assert.assertNotNull(report.getApplicationResourceUsageReport());
+ Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNotNull(report.getApplicationResourceUsageReport());
}