You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [20/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Tue Aug 19 23:49:39 2014
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
@@ -33,18 +36,19 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@@ -56,17 +60,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
 public class TestResourceTrackerService {
 
   private final static File TEMP_DIR = new File(System.getProperty(
@@ -487,33 +484,37 @@ public class TestResourceTrackerService 
     RMApp app = rm.submitApp(1024, true);
 
     // Case 1.1: AppAttemptId is null
-    ContainerStatus status = ContainerStatus.newInstance(
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(
-            app.getApplicationId(), 2), 1),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
-    rm.getResourceTrackerService().handleContainerStatus(status);
+    NMContainerStatus report =
+        NMContainerStatus.newInstance(
+          ContainerId.newInstance(
+            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0, Priority.newInstance(10), 1234);
+    rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event) any());
 
     // Case 1.2: Master container is null
     RMAppAttemptImpl currentAttempt =
         (RMAppAttemptImpl) app.getCurrentAppAttempt();
     currentAttempt.setMasterContainer(null);
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
-    rm.getResourceTrackerService().handleContainerStatus(status);
+    report = NMContainerStatus.newInstance(
+          ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0, Priority.newInstance(10), 1234);
+    rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event)any());
 
     // Case 2: Managed AM
     app = rm.submitApp(1024);
 
     // Case 2.1: AppAttemptId is null
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(
-            app.getApplicationId(), 2), 1),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
+    report = NMContainerStatus.newInstance(
+          ContainerId.newInstance(
+            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
-      rm.getResourceTrackerService().handleContainerStatus(status);
+      rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {
       // expected - ignore
     }
@@ -523,11 +524,12 @@ public class TestResourceTrackerService 
     currentAttempt =
         (RMAppAttemptImpl) app.getCurrentAppAttempt();
     currentAttempt.setMasterContainer(null);
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
+    report = NMContainerStatus.newInstance(
+      ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+      ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+      "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
-      rm.getResourceTrackerService().handleContainerStatus(status);
+      rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {
       // expected - ignore
     }
@@ -593,7 +595,7 @@ public class TestResourceTrackerService 
     // reconnect of node with changed capability
     nm1 = rm.registerNode("host2:5678", 10240);
     dispatcher.await();
-    response = nm2.nodeHeartbeat(true);
+    response = nm1.nodeHeartbeat(true);
     dispatcher.await();
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java Tue Aug 19 23:49:39 2014
@@ -165,7 +165,7 @@ public class TestRMApplicationHistoryWri
     when(container.getAllocatedResource()).thenReturn(
       Resource.newInstance(-1, -1));
     when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
-    when(container.getStartTime()).thenReturn(0L);
+    when(container.getCreationTime()).thenReturn(0L);
     when(container.getFinishTime()).thenReturn(1L);
     when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
     when(container.getLogURL()).thenReturn("test log url");
@@ -281,7 +281,7 @@ public class TestRMApplicationHistoryWri
     Assert.assertEquals(Resource.newInstance(-1, -1),
       containerHD.getAllocatedResource());
     Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority());
-    Assert.assertEquals(0L, container.getStartTime());
+    Assert.assertEquals(0L, container.getCreationTime());
 
     writer.containerFinished(container);
     for (int i = 0; i < MAX_RETRIES; ++i) {
@@ -420,7 +420,7 @@ public class TestRMApplicationHistoryWri
     int waitCount = 0;
     int allocatedSize = allocated.size();
     while (allocatedSize < request && waitCount++ < 200) {
-      Thread.sleep(100);
+      Thread.sleep(300);
       allocated =
           am.allocate(new ArrayList<ResourceRequest>(),
             new ArrayList<ContainerId>()).getAllocatedContainers();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Tue Aug 19 23:49:39 2014
@@ -31,10 +31,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -159,6 +162,16 @@ public abstract class MockAsm extends Mo
     public YarnApplicationState createApplicationState() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public Set<NodeId> getRanNodes() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public RMAppMetrics getRMAppMetrics() {
+      return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0);
+    }
   }
 
   public static RMApp newApplication(int i) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Tue Aug 19 23:49:39 2014
@@ -19,41 +19,47 @@
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-
-import org.junit.Assert;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
 import org.junit.Test;
 
-/**
- * Test to restart the AM on failure.
- *
- */
 public class TestAMRestart {
 
-  @Test
+  @Test(timeout = 30000)
   public void testAMRestartWithExistingContainers() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -123,9 +129,9 @@ public class TestAMRestart {
         ContainerId.newInstance(am1.getApplicationAttemptId(), 6);
     nm1.nodeHeartbeat(true);
     SchedulerApplicationAttempt schedulerAttempt =
-        ((CapacityScheduler) rm1.getResourceScheduler())
+        ((AbstractYarnScheduler) rm1.getResourceScheduler())
           .getCurrentAttemptForContainer(containerId6);
-    while (schedulerAttempt.getReservedContainers().size() == 0) {
+    while (schedulerAttempt.getReservedContainers().isEmpty()) {
       System.out.println("Waiting for container " + containerId6
           + " to be reserved.");
       nm1.nodeHeartbeat(true);
@@ -219,7 +225,7 @@ public class TestAMRestart {
 
     // record the scheduler attempt for testing.
     SchedulerApplicationAttempt schedulerNewAttempt =
-        ((CapacityScheduler) rm1.getResourceScheduler())
+        ((AbstractYarnScheduler) rm1.getResourceScheduler())
           .getCurrentAttemptForContainer(containerId2);
     // finish this application
     MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am2);
@@ -245,7 +251,7 @@ public class TestAMRestart {
     }
   }
 
-  @Test
+  @Test(timeout = 30000)
   public void testNMTokensRebindOnAMRestart() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
@@ -264,31 +270,36 @@ public class TestAMRestart {
     nm2.registerNode();
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    int NUM_CONTAINERS = 1;
     List<Container> containers = new ArrayList<Container>();
     // nmTokens keeps track of all the nmTokens issued in the allocate call.
     List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
 
-    // am1 allocate 1 container on nm1.
+    // am1 allocate 2 container on nm1.
+    // first container
     while (true) {
       AllocateResponse response =
-          am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
+          am1.allocate("127.0.0.1", 2000, 2,
             new ArrayList<ContainerId>());
       nm1.nodeHeartbeat(true);
       containers.addAll(response.getAllocatedContainers());
       expectedNMTokens.addAll(response.getNMTokens());
-      if (containers.size() == NUM_CONTAINERS) {
+      if (containers.size() == 2) {
         break;
       }
       Thread.sleep(200);
       System.out.println("Waiting for container to be allocated.");
     }
-    // launch the container
+    // launch the container-2
     nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
     ContainerId containerId2 =
         ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
     rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-
+    // launch the container-3
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
+    ContainerId containerId3 =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
+    rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
+    
     // fail am1
     nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
     am1.waitForState(RMAppAttemptState.FAILED);
@@ -308,12 +319,12 @@ public class TestAMRestart {
     containers = new ArrayList<Container>();
     while (true) {
       AllocateResponse allocateResponse =
-          am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS,
+          am2.allocate("127.1.1.1", 4000, 1,
             new ArrayList<ContainerId>());
       nm2.nodeHeartbeat(true);
       containers.addAll(allocateResponse.getAllocatedContainers());
       expectedNMTokens.addAll(allocateResponse.getNMTokens());
-      if (containers.size() == NUM_CONTAINERS) {
+      if (containers.size() == 1) {
         break;
       }
       Thread.sleep(200);
@@ -340,4 +351,237 @@ public class TestAMRestart {
     Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
     rm1.stop();
   }
+
+  // AM container preempted, nm disk failure
+  // should not be counted towards AM max retry count.
+  @Test(timeout = 100000)
+  public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    // explicitly set max-am-retry count as 1.
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm1.getResourceScheduler();
+    ContainerId amContainer =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+    // Preempt the first attempt;
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+    am1.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    ApplicationState appState =
+        memStore.getState().getApplicationState().get(app1.getApplicationId());
+    // AM should be restarted even though max-am-attempt is 1.
+    MockAM am2 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+    RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+    // Preempt the second attempt.
+    ContainerId amContainer2 =
+        ContainerId.newInstance(am2.getApplicationAttemptId(), 1);
+    scheduler.killContainer(scheduler.getRMContainer(amContainer2));
+
+    am2.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am3 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
+    RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
+
+    // mimic NM disk_failure
+    ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
+    containerStatus.setContainerId(attempt3.getMasterContainer().getId());
+    containerStatus.setDiagnostics("mimic NM disk_failure");
+    containerStatus.setState(ContainerState.COMPLETE);
+    containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED);
+    Map<ApplicationId, List<ContainerStatus>> conts =
+        new HashMap<ApplicationId, List<ContainerStatus>>();
+    conts.put(app1.getApplicationId(),
+      Collections.singletonList(containerStatus));
+    nm1.nodeHeartbeat(conts, true);
+
+    am3.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
+      appState.getAttempt(am3.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am4 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+    RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+    // create second NM, and register to rm1
+    MockNM nm2 =
+        new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService());
+    nm2.registerNode();
+    // nm1 heartbeats to report unhealthy
+    // This will mimic ContainerExitStatus.ABORT
+    nm1.nodeHeartbeat(false);
+    am4.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.ABORTED,
+      appState.getAttempt(am4.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    // launch next AM in nm2
+    nm2.nodeHeartbeat(true);
+    MockAM am5 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
+    RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
+    // fail the AM normally
+    nm2
+      .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am5.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
+
+    // AM should not be restarted.
+    rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+    Assert.assertEquals(5, app1.getAppAttempts().size());
+    rm1.stop();
+  }
+
+  // Test RM restarts after AM container is preempted, new RM should not count
+  // AM preemption failure towards the max-retry-account and should be able to
+  // re-launch the AM.
+  @Test(timeout = 20000)
+  public void testPreemptedAMRestartOnRMRestart() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    // explicitly set max-am-retry count as 1.
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm1.getResourceScheduler();
+    ContainerId amContainer =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+
+    // Forcibly preempt the am container;
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+    am1.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // state store has 1 attempt stored.
+    ApplicationState appState =
+        memStore.getState().getApplicationState().get(app1.getApplicationId());
+    Assert.assertEquals(1, appState.getAttemptCount());
+    // attempt stored has the preempted container exit status.
+    Assert.assertEquals(ContainerExitStatus.PREEMPTED,
+      appState.getAttempt(am1.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    // Restart rm.
+    MockRM rm2 = new MockRM(conf, memStore);
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode();
+    rm2.start();
+
+    // Restarted RM should re-launch the am.
+    MockAM am2 =
+        rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+    MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
+    RMAppAttempt attempt2 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+          .getCurrentAppAttempt();
+    Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.INVALID,
+      appState.getAttempt(am2.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    rm1.stop();
+    rm2.stop();
+  }
+
+  // Test regular RM restart/failover, new RM should not count
+  // AM failure towards the max-retry-account and should be able to
+  // re-launch the AM.
+  @Test(timeout = 50000)
+  public void testRMRestartOrFailoverNotCountedForAMFailures()
+      throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    // explicitly set max-am-retry count as 1.
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    // AM should be restarted even though max-am-attempt is 1.
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
+
+    // Restart rm.
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    ApplicationState appState =
+        memStore.getState().getApplicationState().get(app1.getApplicationId());
+    // re-register the NM
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+    status
+      .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+    status.setContainerId(attempt1.getMasterContainer().getId());
+    status.setContainerState(ContainerState.COMPLETE);
+    status.setDiagnostics("");
+    nm1.registerNode(Collections.singletonList(status), null);
+
+    rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED);
+    Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+      appState.getAttempt(am1.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    // Will automatically start a new AppAttempt in rm2
+    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am2 =
+        rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+    MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
+    RMAppAttempt attempt3 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+          .getCurrentAppAttempt();
+    Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.INVALID,
+      appState.getAttempt(am2.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+
+    rm1.stop();
+    rm2.stop();
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,25 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
@@ -27,12 +46,16 @@ import java.util.Random;
 import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@@ -52,22 +75,13 @@ import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 public class TestProportionalCapacityPreemptionPolicy {
 
   static final long TS = 3141592653L;
 
   int appAlloc = 0;
+  boolean setAMContainer = false;
+  float setAMResourcePercent = 0.0f;
   Random rand = null;
   Clock mClock = null;
   Configuration conf = null;
@@ -115,6 +129,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][]{
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100,  0, 60, 40 },  // used
       {   0,  0,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
@@ -133,6 +148,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][]{
       //  /   A   B   C  D
       { 100, 10, 40, 20, 30 },  // abs
+      { 100, 100, 100, 100, 100 },  // maxCap
       { 100, 30, 60, 10,  0 },  // used
       {  45, 20,  5, 20,  0 },  // pending
       {   0,  0,  0,  0,  0 },  // reserved
@@ -144,12 +160,33 @@ public class TestProportionalCapacityPre
     policy.editSchedule();
     verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
+  
+  @Test
+  public void testMaxCap() {
+    int[][] qData = new int[][]{
+        //  /   A   B   C
+        { 100, 40, 40, 20 },  // abs
+        { 100, 100, 45, 100 },  // maxCap
+        { 100, 55, 45,  0 },  // used
+        {  20, 10, 10,  0 },  // pending
+        {   0,  0,  0,  0 },  // reserved
+        {   2,  1,  1,  0 },  // apps
+        {  -1,  1,  1,  0 },  // req granularity
+        {   3,  0,  0,  0 },  // subqueues
+      };
+      ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+      policy.editSchedule();
+      // despite the imbalance, since B is at maxCap, do not correct
+      verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
 
+  
   @Test
   public void testPreemptCycle() {
     int[][] qData = new int[][]{
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100,  0, 60, 40 },  // used
       {  10, 10,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
@@ -169,6 +206,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][]{
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100,  0, 60, 40 },  // used
       {  10, 10,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
@@ -205,6 +243,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][]{
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 39, 43, 21 },  // used
       {  10, 10,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
@@ -224,6 +263,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][]{
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 55, 45,  0 },  // used
       {  20, 10, 10,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
@@ -242,6 +282,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][]{
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 55, 45,  0 },  // used
       {  20, 10, 10,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
@@ -261,6 +302,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][]{
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 90, 10,  0 },  // used
       {  80, 10, 20, 50 },  // pending
       {   0,  0,  0,  0 },  // reserved
@@ -280,6 +322,7 @@ public class TestProportionalCapacityPre
     int[][] qData = new int[][] {
       //  /    A   B   C    D   E   F
       { 200, 100, 50, 50, 100, 10, 90 },  // abs
+      { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
       { 200, 110, 60, 50,  90, 90,  0 },  // used
       {  10,   0,  0,  0,  10,  0, 10 },  // pending
       {   0,   0,  0,  0,   0,  0,  0 },  // reserved
@@ -295,10 +338,54 @@ public class TestProportionalCapacityPre
   }
 
   @Test
+  public void testZeroGuar() {
+    int[][] qData = new int[][] {
+      //  /    A   B   C    D   E   F
+        { 200, 100, 0, 99, 100, 10, 90 },  // abs
+        { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
+        { 170,  80, 60, 20,  90, 90,  0 },  // used
+        {  10,   0,  0,  0,  10,  0, 10 },  // pending
+        {   0,   0,  0,  0,   0,  0,  0 },  // reserved
+        {   4,   2,  1,  1,   2,  1,  1 },  // apps
+        {  -1,  -1,  1,  1,  -1,  1,  1 },  // req granularity
+        {   2,   2,  0,  0,   2,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // verify capacity taken from A1, not B1 despite B1 being far over
+    // its absolute guaranteed capacity
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+  
+  @Test
+  public void testZeroGuarOverCap() {
+    int[][] qData = new int[][] {
+      //  /    A   B   C    D   E   F
+         { 200, 100, 0, 99, 0, 100, 100 },  // abs
+        { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
+        { 170,  170, 60, 20, 90, 0,  0 },  // used
+        {  85,   50,  30,  10,  10,  20, 20 },  // pending
+        {   0,   0,  0,  0,   0,  0,  0 },  // reserved
+        {   4,   3,  1,  1,   1,  1,  1 },  // apps
+        {  -1,  -1,  1,  1,  1,  -1,  1 },  // req granularity
+        {   2,   3,  0,  0,   0,  1,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // we verify both that C has priority on B and D (has it has >0 guarantees)
+    // and that B and D are force to share their over capacity fairly (as they
+    // are both zero-guarantees) hence D sees some of its containers preempted
+    verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+  }
+  
+  
+  
+  @Test
   public void testHierarchicalLarge() {
     int[][] qData = new int[][] {
       //  /    A   B   C    D   E   F    G   H   I
-      { 400, 200, 60,140, 100, 70, 30, 100, 10, 90  },  // abs
+      { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90  },  // abs
+      { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, },  // maxCap
       { 400, 210, 70,140, 100, 50, 50,  90, 90,  0  },  // used
       {  10,   0,  0,  0,   0,  0,  0,   0,  0, 15  },  // pending
       {   0,   0,  0,  0,   0,  0,  0,   0,  0,  0  },  // reserved
@@ -351,7 +438,138 @@ public class TestProportionalCapacityPre
     assert containers.get(4).equals(rm5);
 
   }
-
+  
+  @Test
+  public void testPolicyInitializeAfterSchedulerInitialized() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+        ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    
+    @SuppressWarnings("resource")
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    
+    // ProportionalCapacityPreemptionPolicy should be initialized after
+    // CapacityScheduler initialized. We will 
+    // 1) find SchedulingMonitor from RMActiveService's service list, 
+    // 2) check if ResourceCalculator in policy is null or not. 
+    // If it's not null, we can come to a conclusion that policy initialized
+    // after scheduler got initialized
+    for (Service service : rm.getRMActiveService().getServices()) {
+      if (service instanceof SchedulingMonitor) {
+        ProportionalCapacityPreemptionPolicy policy =
+            (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
+                .getSchedulingEditPolicy();
+        assertNotNull(policy.getResourceCalculator());
+        return;
+      }
+    }
+    
+    fail("Failed to find SchedulingMonitor service, please check what happened");
+  }
+  
+  @Test
+  public void testSkipAMContainer() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 50, 50 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 50 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 1, 1 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // By skipping AM Container, all other 24 containers of appD will be
+    // preempted
+    verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // By skipping AM Container, all other 24 containers of appC will be
+    // preempted
+    verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // Since AM containers of appC and appD are saved, 2 containers from appB
+    // has to be preempted.
+    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    setAMContainer = false;
+  }
+  
+  @Test
+  public void testPreemptSkippedAMContainers() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 10, 90 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 90 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 5, 5 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // All 5 containers of appD will be preempted including AM container.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // All 5 containers of appC will be preempted including AM container.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    
+    // By skipping AM Container, all other 4 containers of appB will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // By skipping AM Container, all other 4 containers of appA will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    setAMContainer = false;
+  }
+  
+  @Test
+  public void testAMResourcePercentForSkippedAMContainers() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 10, 90 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 90 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 5, 5 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    setAMResourcePercent = 0.5f;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb.
+    // Total used AM container size is 20GB, hence 2 AM container has
+    // to be preempted as Queue Capacity is 10Gb.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // Including AM Container, all other 4 containers of appC will be
+    // preempted
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    
+    // By skipping AM Container, all other 4 containers of appB will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // By skipping AM Container, all other 4 containers of appA will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    setAMContainer = false;
+  }
+  
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
@@ -382,24 +600,25 @@ public class TestProportionalCapacityPre
     when(mCS.getRootQueue()).thenReturn(mRoot);
 
     Resource clusterResources =
-      Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
-    when(mCS.getClusterResources()).thenReturn(clusterResources);
+      Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
+    when(mCS.getClusterResource()).thenReturn(clusterResources);
     return policy;
   }
 
   ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
     int[] abs      = queueData[0];
-    int[] used     = queueData[1];
-    int[] pending  = queueData[2];
-    int[] reserved = queueData[3];
-    int[] apps     = queueData[4];
-    int[] gran     = queueData[5];
-    int[] queues   = queueData[6];
+    int[] maxCap   = queueData[1];
+    int[] used     = queueData[2];
+    int[] pending  = queueData[3];
+    int[] reserved = queueData[4];
+    int[] apps     = queueData[5];
+    int[] gran     = queueData[6];
+    int[] queues   = queueData[7];
 
-    return mockNested(abs, used, pending, reserved, apps, gran, queues);
+    return mockNested(abs, maxCap, used, pending,  reserved, apps, gran, queues);
   }
 
-  ParentQueue mockNested(int[] abs, int[] used,
+  ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
       int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
     float tot = leafAbsCapacities(abs, queues);
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
@@ -407,6 +626,8 @@ public class TestProportionalCapacityPre
     when(root.getQueueName()).thenReturn("/");
     when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
     when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
+    when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+
     for (int i = 1; i < queues.length; ++i) {
       final CSQueue q;
       final ParentQueue p = pqs.removeLast();
@@ -420,6 +641,7 @@ public class TestProportionalCapacityPre
       when(q.getQueueName()).thenReturn(queueName);
       when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
       when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
+      when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
     }
     assert 0 == pqs.size();
     return root;
@@ -439,7 +661,7 @@ public class TestProportionalCapacityPre
     return pq;
   }
 
-  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
+  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
       int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
     when(lq.getTotalResourcePending()).thenReturn(
@@ -464,6 +686,9 @@ public class TestProportionalCapacityPre
       }
     }
     when(lq.getApplications()).thenReturn(qApps);
+    if(setAMResourcePercent != 0.0f){
+      when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
+    }
     p.getChildQueues().add(lq);
     return lq;
   }
@@ -488,7 +713,11 @@ public class TestProportionalCapacityPre
 
     List<RMContainer> cLive = new ArrayList<RMContainer>();
     for (int i = 0; i < used; i += gran) {
-      cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+      if(setAMContainer && i == 0){
+        cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
+      }else{
+        cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+      }
       ++cAlloc;
     }
     when(app.getLiveContainers()).thenReturn(cLive);
@@ -504,6 +733,10 @@ public class TestProportionalCapacityPre
     RMContainer mC = mock(RMContainer.class);
     when(mC.getContainerId()).thenReturn(cId);
     when(mC.getContainer()).thenReturn(c);
+    when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+    if(0 == priority){
+      when(mC.isAMContainer()).thenReturn(true);
+    }
     return mC;
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Map;
 import javax.crypto.SecretKey;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -55,18 +55,21 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class RMStateStoreTestBase extends ClientBaseWithFixes{
@@ -74,10 +77,9 @@ public class RMStateStoreTestBase extend
   public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
 
   static class TestDispatcher implements
-      Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> {
+      Dispatcher, EventHandler<RMAppAttemptEvent> {
 
     ApplicationAttemptId attemptId;
-    Exception storedException;
 
     boolean notified = false;
 
@@ -88,9 +90,8 @@ public class RMStateStoreTestBase extend
     }
 
     @Override
-    public void handle(RMAppAttemptNewSavedEvent event) {
+    public void handle(RMAppAttemptEvent event) {
       assertEquals(attemptId, event.getApplicationAttemptId());
-      assertEquals(storedException, event.getStoredException());
       notified = true;
       synchronized (this) {
         notifyAll();
@@ -108,8 +109,8 @@ public class RMStateStoreTestBase extend
   interface RMStateStoreHelper {
     RMStateStore getRMStateStore() throws Exception;
     boolean isFinalStateValid() throws Exception;
-    void writeVersion(RMStateVersion version) throws Exception;
-    RMStateVersion getCurrentVersion() throws Exception;
+    void writeVersion(Version version) throws Exception;
+    Version getCurrentVersion() throws Exception;
     boolean appExists(RMApp app) throws Exception;
   }
 
@@ -160,7 +161,6 @@ public class RMStateStoreTestBase extend
     when(mockAttempt.getClientTokenMasterKey())
         .thenReturn(clientTokenMasterKey);
     dispatcher.attemptId = attemptId;
-    dispatcher.storedException = null;
     store.storeNewApplicationAttempt(mockAttempt);
     waitNotify(dispatcher);
     return container.getId();
@@ -175,8 +175,15 @@ public class RMStateStoreTestBase extend
     TestDispatcher dispatcher = new TestDispatcher();
     store.setRMDispatcher(dispatcher);
 
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(store);
+
     AMRMTokenSecretManager appTokenMgr =
-        new AMRMTokenSecretManager(conf);
+        spy(new AMRMTokenSecretManager(conf, rmContext));
+
+    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
     ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
         new ClientToAMTokenSecretManagerInRM();
 
@@ -188,8 +195,6 @@ public class RMStateStoreTestBase extend
     // create application token and client token key for attempt1
     Token<AMRMTokenIdentifier> appAttemptToken1 =
         generateAMRMToken(attemptId1, appTokenMgr);
-    HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
-    attemptTokenSet1.add(appAttemptToken1);
     SecretKey clientTokenKey1 =
         clientToAMTokenMgr.createMasterKey(attemptId1);
 
@@ -204,8 +209,6 @@ public class RMStateStoreTestBase extend
     // create application token and client token key for attempt2
     Token<AMRMTokenIdentifier> appAttemptToken2 =
         generateAMRMToken(attemptId2, appTokenMgr);
-    HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
-    attemptTokenSet2.add(appAttemptToken2);
     SecretKey clientTokenKey2 =
         clientToAMTokenMgr.createMasterKey(attemptId2);
 
@@ -267,12 +270,9 @@ public class RMStateStoreTestBase extend
     // attempt1 is loaded correctly
     assertNotNull(attemptState);
     assertEquals(attemptId1, attemptState.getAttemptId());
+    assertEquals(-1000, attemptState.getAMContainerExitStatus());
     // attempt1 container is loaded correctly
     assertEquals(containerId1, attemptState.getMasterContainer().getId());
-    // attempt1 applicationToken is loaded correctly
-    HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
-    savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
-    assertEquals(attemptTokenSet1, savedTokens);
     // attempt1 client token master key is loaded correctly
     assertArrayEquals(clientTokenKey1.getEncoded(),
         attemptState.getAppAttemptCredentials()
@@ -284,10 +284,6 @@ public class RMStateStoreTestBase extend
     assertEquals(attemptId2, attemptState.getAttemptId());
     // attempt2 container is loaded correctly
     assertEquals(containerId2, attemptState.getMasterContainer().getId());
-    // attempt2 applicationToken is loaded correctly
-    savedTokens.clear();
-    savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
-    assertEquals(attemptTokenSet2, savedTokens);
     // attempt2 client token master key is loaded correctly
     assertArrayEquals(clientTokenKey2.getEncoded(),
         attemptState.getAppAttemptCredentials()
@@ -308,7 +304,7 @@ public class RMStateStoreTestBase extend
           oldAttemptState.getAppAttemptCredentials(),
           oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
           "myTrackingUrl", "attemptDiagnostics",
-          FinalApplicationStatus.SUCCEEDED);
+          FinalApplicationStatus.SUCCEEDED, 100);
     store.updateApplicationAttemptState(newAttemptState);
 
     // test updating the state of an app/attempt whose initial state was not
@@ -331,7 +327,7 @@ public class RMStateStoreTestBase extend
           oldAttemptState.getAppAttemptCredentials(),
           oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
           "myTrackingUrl", "attemptDiagnostics",
-          FinalApplicationStatus.SUCCEEDED);
+          FinalApplicationStatus.SUCCEEDED, 111);
     store.updateApplicationAttemptState(dummyAttempt);
 
     // let things settle down
@@ -370,6 +366,7 @@ public class RMStateStoreTestBase extend
     assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
     assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
     assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
+    assertEquals(100, updatedAttemptState.getAMContainerExitStatus());
     assertEquals(FinalApplicationStatus.SUCCEEDED,
       updatedAttemptState.getFinalApplicationStatus());
 
@@ -453,10 +450,8 @@ public class RMStateStoreTestBase extend
   private Token<AMRMTokenIdentifier> generateAMRMToken(
       ApplicationAttemptId attemptId,
       AMRMTokenSecretManager appTokenMgr) {
-    AMRMTokenIdentifier appTokenId =
-        new AMRMTokenIdentifier(attemptId);
     Token<AMRMTokenIdentifier> appToken =
-        new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+        appTokenMgr.createAndGetAMRMToken(attemptId);
     appToken.setService(new Text("appToken service"));
     return appToken;
   }
@@ -467,13 +462,13 @@ public class RMStateStoreTestBase extend
     store.setRMDispatcher(new TestDispatcher());
 
     // default version
-    RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion();
+    Version defaultVersion = stateStoreHelper.getCurrentVersion();
     store.checkVersion();
     Assert.assertEquals(defaultVersion, store.loadVersion());
 
     // compatible version
-    RMStateVersion compatibleVersion =
-        RMStateVersion.newInstance(defaultVersion.getMajorVersion(),
+    Version compatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion(),
           defaultVersion.getMinorVersion() + 2);
     stateStoreHelper.writeVersion(compatibleVersion);
     Assert.assertEquals(compatibleVersion, store.loadVersion());
@@ -482,8 +477,8 @@ public class RMStateStoreTestBase extend
     Assert.assertEquals(defaultVersion, store.loadVersion());
 
     // incompatible version
-    RMStateVersion incompatibleVersion =
-        RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2,
+    Version incompatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion() + 2,
           defaultVersion.getMinorVersion());
     stateStoreHelper.writeVersion(incompatibleVersion);
     try {
@@ -493,21 +488,53 @@ public class RMStateStoreTestBase extend
       Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
     }
   }
+  
+  public void testEpoch(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(new TestDispatcher());
+    
+    int firstTimeEpoch = store.getAndIncrementEpoch();
+    Assert.assertEquals(0, firstTimeEpoch);
+    
+    int secondTimeEpoch = store.getAndIncrementEpoch();
+    Assert.assertEquals(1, secondTimeEpoch);
+    
+    int thirdTimeEpoch = store.getAndIncrementEpoch();
+    Assert.assertEquals(2, thirdTimeEpoch);
+  }
 
   public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
       throws Exception {
     RMStateStore store = stateStoreHelper.getRMStateStore();
     store.setRMDispatcher(new TestDispatcher());
-    // create and store apps
+    ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
+
+    for (RMApp app : appList) {
+      // remove the app
+      store.removeApplication(app);
+      // wait for app to be removed.
+      while (true) {
+        if (!stateStoreHelper.appExists(app)) {
+          break;
+        } else {
+          Thread.sleep(100);
+        }
+      }
+    }
+  }
+
+  private ArrayList<RMApp> createAndStoreApps(
+      RMStateStoreHelper stateStoreHelper, RMStateStore store, int numApps)
+      throws Exception {
     ArrayList<RMApp> appList = new ArrayList<RMApp>();
-    int NUM_APPS = 5;
-    for (int i = 0; i < NUM_APPS; i++) {
+    for (int i = 0; i < numApps; i++) {
       ApplicationId appId = ApplicationId.newInstance(1383183338, i);
       RMApp app = storeApp(store, appId, 123456789, 987654321);
       appList.add(app);
     }
 
-    Assert.assertEquals(NUM_APPS, appList.size());
+    Assert.assertEquals(numApps, appList.size());
     for (RMApp app : appList) {
       // wait for app to be stored.
       while (true) {
@@ -518,18 +545,17 @@ public class RMStateStoreTestBase extend
         }
       }
     }
+    return appList;
+  }
 
+  public void testDeleteStore(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
+    store.deleteStore();
+    // verify apps deleted
     for (RMApp app : appList) {
-      // remove the app
-      store.removeApplication(app);
-      // wait for app to be removed.
-      while (true) {
-        if (!stateStoreHelper.appExists(app)) {
-          break;
-        } else {
-          Thread.sleep(100);
-        }
-      }
+      Assert.assertFalse(stateStoreHelper.appExists(app));
     }
   }
 
@@ -541,4 +567,65 @@ public class RMStateStoreTestBase extend
 
   }
 
+  public void testAMRMTokenSecretManagerStateStore(
+      RMStateStoreHelper stateStoreHelper) throws Exception {
+    System.out.println("Start testing");
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(store);
+    Configuration conf = new YarnConfiguration();
+    AMRMTokenSecretManager appTokenMgr =
+        new AMRMTokenSecretManager(conf, rmContext);
+
+    //create and save the first masterkey
+    MasterKeyData firstMasterKeyData = appTokenMgr.createNewMasterKey();
+
+    AMRMTokenSecretManagerState state1 =
+        AMRMTokenSecretManagerState.newInstance(
+          firstMasterKeyData.getMasterKey(), null);
+    rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state1,
+      false);
+
+    // load state
+    store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(dispatcher);
+    RMState state = store.loadState();
+    Assert.assertNotNull(state.getAMRMTokenSecretManagerState());
+    Assert.assertEquals(firstMasterKeyData.getMasterKey(), state
+      .getAMRMTokenSecretManagerState().getCurrentMasterKey());
+    Assert.assertNull(state
+      .getAMRMTokenSecretManagerState().getNextMasterKey());
+
+    //create and save the second masterkey
+    MasterKeyData secondMasterKeyData = appTokenMgr.createNewMasterKey();
+    AMRMTokenSecretManagerState state2 =
+        AMRMTokenSecretManagerState
+          .newInstance(firstMasterKeyData.getMasterKey(),
+            secondMasterKeyData.getMasterKey());
+    rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state2,
+      true);
+
+    // load state
+    store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(dispatcher);
+    RMState state_2 = store.loadState();
+    Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState());
+    Assert.assertEquals(firstMasterKeyData.getMasterKey(), state_2
+      .getAMRMTokenSecretManagerState().getCurrentMasterKey());
+    Assert.assertEquals(secondMasterKeyData.getMasterKey(), state_2
+      .getAMRMTokenSecretManagerState().getNextMasterKey());
+
+    // re-create the masterKeyData based on the recovered masterkey
+    // should have the same secretKey
+    appTokenMgr.recover(state_2);
+    Assert.assertEquals(appTokenMgr.getCurrnetMasterKeyData().getSecretKey(),
+      firstMasterKeyData.getSecretKey());
+    Assert.assertEquals(appTokenMgr.getNextMasterKeyData().getSecretKey(),
+      secondMasterKeyData.getSecretKey());
+
+    store.close();
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,9 +36,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -71,7 +70,7 @@ public class TestFSRMStateStore extends 
         return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
       }
 
-      public RMStateVersion getCurrentVersion() {
+      public Version getCurrentVersion() {
         return CURRENT_VERSION_INFO;
       }
 
@@ -112,13 +111,13 @@ public class TestFSRMStateStore extends 
     }
 
     @Override
-    public void writeVersion(RMStateVersion version) throws Exception {
-      store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version)
+    public void writeVersion(Version version) throws Exception {
+      store.updateFile(store.getVersionNode(), ((VersionPBImpl) version)
         .getProto().toByteArray());
     }
 
     @Override
-    public RMStateVersion getCurrentVersion() throws Exception {
+    public Version getCurrentVersion() throws Exception {
       return store.getCurrentVersion();
     }
 
@@ -158,7 +157,10 @@ public class TestFSRMStateStore extends 
           .getFileSystem(conf).exists(tempAppAttemptFile));
       testRMDTSecretManagerStateStore(fsTester);
       testCheckVersion(fsTester);
+      testEpoch(fsTester);
       testAppDeletion(fsTester);
+      testDeleteStore(fsTester);
+      testAMRMTokenSecretManagerStateStore(fsTester);
     } finally {
       cluster.shutdown();
     }
@@ -213,9 +215,8 @@ public class TestFSRMStateStore extends 
           try {
             store.storeApplicationStateInternal(
                 ApplicationId.newInstance(100L, 1),
-                (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
-                    .newApplicationStateData(111, 111, "user", null,
-                        RMAppState.ACCEPTED, "diagnostics", 333));
+                ApplicationStateData.newInstance(111, 111, "user", null,
+                    RMAppState.ACCEPTED, "diagnostics", 333));
           } catch (Exception e) {
             // TODO 0 datanode exception will not be retried by dfs client, fix
             // that separately.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -32,9 +32,9 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
@@ -69,7 +69,7 @@ public class TestZKRMStateStore extends 
         return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
       }
 
-      public RMStateVersion getCurrentVersion() {
+      public Version getCurrentVersion() {
         return CURRENT_VERSION_INFO;
       }
 
@@ -96,13 +96,13 @@ public class TestZKRMStateStore extends 
     }
 
     @Override
-    public void writeVersion(RMStateVersion version) throws Exception {
-      client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version)
+    public void writeVersion(Version version) throws Exception {
+      client.setData(store.getVersionNode(), ((VersionPBImpl) version)
         .getProto().toByteArray(), -1);
     }
 
     @Override
-    public RMStateVersion getCurrentVersion() throws Exception {
+    public Version getCurrentVersion() throws Exception {
       return store.getCurrentVersion();
     }
 
@@ -120,7 +120,10 @@ public class TestZKRMStateStore extends 
     testRMAppStateStore(zkTester);
     testRMDTSecretManagerStateStore(zkTester);
     testCheckVersion(zkTester);
+    testEpoch(zkTester);
     testAppDeletion(zkTester);
+    testDeleteStore(zkTester);
+    testAMRMTokenSecretManagerStateStore(zkTester);
   }
 
   private Configuration createHARMConf(

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Tue Aug 19 23:49:39 2014
@@ -41,6 +41,7 @@ import java.security.NoSuchAlgorithmExce
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -203,7 +204,7 @@ public class TestZKRMStateStoreZKClientC
       LOG.error(error, e);
       fail(error);
     }
-    Assert.assertEquals("newBytes", new String(ret));
+    assertEquals("newBytes", new String(ret));
   }
 
   @Test(timeout = 20000)
@@ -232,7 +233,7 @@ public class TestZKRMStateStoreZKClientC
 
     try {
       byte[] ret = store.getDataWithRetries(path, false);
-      Assert.assertEquals("bytes", new String(ret));
+      assertEquals("bytes", new String(ret));
     } catch (Exception e) {
       String error = "New session creation failed";
       LOG.error(error, e);
@@ -281,4 +282,24 @@ public class TestZKRMStateStoreZKClientC
 
     zkClientTester.getRMStateStore(conf);
   }
+
+  @Test
+  public void testZKRetryInterval() throws Exception {
+    TestZKClient zkClientTester = new TestZKClient();
+    YarnConfiguration conf = new YarnConfiguration();
+
+    ZKRMStateStore store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
+        store.zkRetryInterval);
+    store.stop();
+
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
+            YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
+        store.zkRetryInterval);
+    store.stop();
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -232,4 +234,18 @@ public class MockRMApp implements RMApp 
   public YarnApplicationState createApplicationState() {
     return null;
   }
+
+  @Override
+  public Set<NodeId> getRanNodes() {
+    return null;
+  }
+  
+  public Resource getResourcePreempted() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public RMAppMetrics getRMAppMetrics() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Aug 19 23:49:39 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
@@ -59,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -192,7 +192,7 @@ public class TestRMAppTransitions {
     this.rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
-          null, new AMRMTokenSecretManager(conf),
+          null, new AMRMTokenSecretManager(conf, this.rmContext),
           new RMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInRM(conf),
           new ClientToAMTokenSecretManagerInRM(),
@@ -327,15 +327,15 @@ public class TestRMAppTransitions {
 
   private void sendAppUpdateSavedEvent(RMApp application) {
     RMAppEvent event =
-        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
     application.handle(event);
     rmDispatcher.await();
   }
 
   private void sendAttemptUpdateSavedEvent(RMApp application) {
     application.getCurrentAppAttempt().handle(
-      new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
-        .getAppAttemptId(), null));
+        new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(),
+            RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
   }
 
   protected RMApp testCreateAppNewSaving(
@@ -356,7 +356,7 @@ public class TestRMAppTransitions {
   RMApp application = testCreateAppNewSaving(submissionContext);
     // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
     RMAppEvent event =
-        new RMAppNewSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
@@ -421,7 +421,7 @@ public class TestRMAppTransitions {
     RMApp application = testCreateAppFinalSaving(submissionContext);
     // FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
     RMAppEvent appUpdated =
-        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
     application.handle(appUpdated);
     assertAppState(RMAppState.FINISHING, application);
     assertTimesAtFinish(application);
@@ -762,7 +762,7 @@ public class TestRMAppTransitions {
     application.handle(event);
     assertAppState(RMAppState.FINAL_SAVING, application);
     RMAppEvent appUpdated =
-        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
     application.handle(appUpdated);
     assertAppState(RMAppState.FINISHED, application);
 
@@ -921,6 +921,7 @@ public class TestRMAppTransitions {
     assertAppState(RMAppState.NEW, app);
     ApplicationReport report = app.createAndGetApplicationReport(null, true);
     Assert.assertNotNull(report.getApplicationResourceUsageReport());
+    Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
     report = app.createAndGetApplicationReport("clientuser", true);
     Assert.assertNotNull(report.getApplicationResourceUsageReport());
   }