You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/07 18:29:20 UTC

svn commit: r1601151 [3/5] - in /hadoop/common/branches/HDFS-5442/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yar...

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Sat Jun  7 16:29:10 2014
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
@@ -120,6 +122,10 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
@@ -188,7 +194,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService locService =
       spy(new ResourceLocalizationService(dispatcher, exec, delService,
-                                          diskhandler));
+                                          diskhandler,
+                                          new NMNullStateStoreService()));
     doReturn(lfs)
       .when(locService).getLocalFileContext(isA(Configuration.class));
     try {
@@ -253,7 +260,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler);
+                                      dirsHandler,
+                                      new NMNullStateStoreService());
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -287,7 +295,7 @@ public class TestResourceLocalizationSer
               user, appId);
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
       
       // init resources
       Random r = new Random();
@@ -402,6 +410,233 @@ public class TestResourceLocalizationSer
     }
   }
   
+  @Test
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testRecovery() throws Exception {
+    final String user1 = "user1";
+    final String user2 = "user2";
+    final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    final ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[4];
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+      sDirs[i] = localDirs.get(i).toString();
+    }
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+    NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+    //Ignore actual localization
+    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerBus);
+
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    ResourceLocalizationService spyService =
+        createSpyService(dispatcher, dirsHandler, stateStore);
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      final Application app1 = mock(Application.class);
+      when(app1.getUser()).thenReturn(user1);
+      when(app1.getAppId()).thenReturn(appId1);
+      final Application app2 = mock(Application.class);
+      when(app2.getUser()).thenReturn(user2);
+      when(app2.getAppId()).thenReturn(appId2);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app1));
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app2));
+      dispatcher.await();
+
+      //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
+      LocalResourcesTracker appTracker1 =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user1, appId1);
+      LocalResourcesTracker privTracker1 =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+              user1, null);
+      LocalResourcesTracker appTracker2 =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user2, appId2);
+      LocalResourcesTracker pubTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+              null, null);
+
+      // init containers
+      final Container c1 = getMockContainer(appId1, 1, user1);
+      final Container c2 = getMockContainer(appId2, 2, user2);
+
+      // init resources
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+
+      // Send localization requests of each type.
+      final LocalResource privResource1 = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq1 =
+          new LocalResourceRequest(privResource1);
+      final LocalResource privResource2 = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq2 =
+          new LocalResourceRequest(privResource2);
+
+      final LocalResource pubResource1 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq1 =
+          new LocalResourceRequest(pubResource1);
+      final LocalResource pubResource2 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq2 =
+          new LocalResourceRequest(pubResource2);
+
+      final LocalResource appResource1 = getAppMockedResource(r);
+      final LocalResourceRequest appReq1 =
+          new LocalResourceRequest(appResource1);
+      final LocalResource appResource2 = getAppMockedResource(r);
+      final LocalResourceRequest appReq2 =
+          new LocalResourceRequest(appResource2);
+      final LocalResource appResource3 = getAppMockedResource(r);
+      final LocalResourceRequest appReq3 =
+          new LocalResourceRequest(appResource3);
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req1 =
+          new HashMap<LocalResourceVisibility,
+                      Collection<LocalResourceRequest>>();
+      req1.put(LocalResourceVisibility.PRIVATE,
+          Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 }));
+      req1.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq1));
+      req1.put(LocalResourceVisibility.APPLICATION,
+          Collections.singletonList(appReq1));
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+        new HashMap<LocalResourceVisibility,
+                    Collection<LocalResourceRequest>>();
+      req2.put(LocalResourceVisibility.APPLICATION,
+          Arrays.asList(new LocalResourceRequest[] { appReq2, appReq3 }));
+      req2.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq2));
+
+      // Send Request event
+      spyService.handle(new ContainerLocalizationRequestEvent(c1, req1));
+      spyService.handle(new ContainerLocalizationRequestEvent(c2, req2));
+      dispatcher.await();
+
+      // Simulate start of localization for all resources
+      privTracker1.getPathForLocalization(privReq1,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.USERCACHE + user1));
+      privTracker1.getPathForLocalization(privReq2,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.USERCACHE + user1));
+      LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
+      LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
+      appTracker1.getPathForLocalization(appReq1,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId1));
+      LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
+      appTracker2.getPathForLocalization(appReq2,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId2));
+      LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
+      appTracker2.getPathForLocalization(appReq3,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId2));
+      LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
+      pubTracker.getPathForLocalization(pubReq1,
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+      LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
+      pubTracker.getPathForLocalization(pubReq2,
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+      LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
+
+      // Simulate completion of localization for most resources with
+      // possibly different sizes than in the request
+      assertNotNull("Localization not started", privLr1.getLocalPath());
+      privTracker1.handle(new ResourceLocalizedEvent(privReq1,
+          privLr1.getLocalPath(), privLr1.getSize() + 5));
+      assertNotNull("Localization not started", privLr2.getLocalPath());
+      privTracker1.handle(new ResourceLocalizedEvent(privReq2,
+          privLr2.getLocalPath(), privLr2.getSize() + 10));
+      assertNotNull("Localization not started", appLr1.getLocalPath());
+      appTracker1.handle(new ResourceLocalizedEvent(appReq1,
+          appLr1.getLocalPath(), appLr1.getSize()));
+      assertNotNull("Localization not started", appLr3.getLocalPath());
+      appTracker2.handle(new ResourceLocalizedEvent(appReq3,
+          appLr3.getLocalPath(), appLr3.getSize() + 7));
+      assertNotNull("Localization not started", pubLr1.getLocalPath());
+      pubTracker.handle(new ResourceLocalizedEvent(pubReq1,
+          pubLr1.getLocalPath(), pubLr1.getSize() + 1000));
+      assertNotNull("Localization not started", pubLr2.getLocalPath());
+      pubTracker.handle(new ResourceLocalizedEvent(pubReq2,
+          pubLr2.getLocalPath(), pubLr2.getSize() + 99999));
+
+      dispatcher.await();
+      assertEquals(ResourceState.LOCALIZED, privLr1.getState());
+      assertEquals(ResourceState.LOCALIZED, privLr2.getState());
+      assertEquals(ResourceState.LOCALIZED, appLr1.getState());
+      assertEquals(ResourceState.DOWNLOADING, appLr2.getState());
+      assertEquals(ResourceState.LOCALIZED, appLr3.getState());
+      assertEquals(ResourceState.LOCALIZED, pubLr1.getState());
+      assertEquals(ResourceState.LOCALIZED, pubLr2.getState());
+
+      // restart and recover
+      spyService = createSpyService(dispatcher, dirsHandler, stateStore);
+      spyService.init(conf);
+      spyService.recoverLocalizedResources(
+          stateStore.loadLocalizationState());
+      dispatcher.await();
+
+      appTracker1 = spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user1, appId1);
+      privTracker1 = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PRIVATE, user1, null);
+      appTracker2 = spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user2, appId2);
+      pubTracker = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PUBLIC, null, null);
+
+      LocalizedResource recoveredRsrc =
+          privTracker1.getLocalizedResource(privReq1);
+      assertEquals(privReq1, recoveredRsrc.getRequest());
+      assertEquals(privLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(privLr1.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = privTracker1.getLocalizedResource(privReq2);
+      assertEquals(privReq2, recoveredRsrc.getRequest());
+      assertEquals(privLr2.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(privLr2.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = appTracker1.getLocalizedResource(appReq1);
+      assertEquals(appReq1, recoveredRsrc.getRequest());
+      assertEquals(appLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(appLr1.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = appTracker2.getLocalizedResource(appReq2);
+      assertNull("in-progress resource should not be present", recoveredRsrc);
+      recoveredRsrc = appTracker2.getLocalizedResource(appReq3);
+      assertEquals(appReq3, recoveredRsrc.getRequest());
+      assertEquals(appLr3.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(appLr3.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+    } finally {
+      dispatcher.stop();
+      stateStore.close();
+    }
+  }
+
   @Test( timeout = 10000)
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
@@ -436,7 +671,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler);
+                                      dirsHandler,
+                                      new NMNullStateStoreService());
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -469,7 +705,7 @@ public class TestResourceLocalizationSer
       long seed = r.nextLong();
       System.out.println("SEED: " + seed);
       r.setSeed(seed);
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, "user0");
       FSDataOutputStream out =
         new FSDataOutputStream(new DataOutputBuffer(), null);
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
@@ -616,7 +852,8 @@ public class TestResourceLocalizationSer
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-                                        dirsHandler);
+                                        dirsHandler,
+                                        new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -637,7 +874,7 @@ public class TestResourceLocalizationSer
       dispatcher.await();
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
 
       // init resources
       Random r = new Random();
@@ -725,7 +962,7 @@ public class TestResourceLocalizationSer
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-            dirsHandlerSpy);
+            dirsHandlerSpy, new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -758,7 +995,7 @@ public class TestResourceLocalizationSer
         .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
 
       // first test ioexception
       Mockito
@@ -838,7 +1075,7 @@ public class TestResourceLocalizationSer
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler);
+            localDirHandler, new NMNullStateStoreService());
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -991,7 +1228,7 @@ public class TestResourceLocalizationSer
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler);
+            localDirHandler, new NMNullStateStoreService());
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -1157,7 +1394,7 @@ public class TestResourceLocalizationSer
       // it as otherwise it will remove requests from pending queue.
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            dirsHandler);
+            dirsHandler, new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       dispatcher1.register(LocalizationEventType.class, spyService);
       spyService.init(conf);
@@ -1424,12 +1661,13 @@ public class TestResourceLocalizationSer
     return getMockedResource(r, LocalResourceVisibility.PRIVATE);
   }
 
-  private static Container getMockContainer(ApplicationId appId, int id) {
+  private static Container getMockContainer(ApplicationId appId, int id,
+      String user) {
     Container c = mock(Container.class);
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(appId, 1);
     ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
-    when(c.getUser()).thenReturn("user0");
+    when(c.getUser()).thenReturn(user);
     when(c.getContainerId()).thenReturn(cId);
     Credentials creds = new Credentials();
     creds.addToken(new Text("tok" + id), getToken(id));
@@ -1438,6 +1676,24 @@ public class TestResourceLocalizationSer
     return c;
   }
 
+  private ResourceLocalizationService createSpyService(
+      DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
+      NMStateStoreService stateStore) {
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
+    DeletionService delService = mock(DeletionService.class);
+    ResourceLocalizationService rawService =
+      new ResourceLocalizationService(dispatcher, exec, delService,
+                                      dirsHandler, stateStore);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
+        isA(Configuration.class));
+    doReturn(lfs).when(spyService)
+        .getLocalFileContext(isA(Configuration.class));
+    return spyService;
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   static Token<? extends TokenIdentifier> getToken(int id) {
     return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Sat Jun  7 16:29:10 2014
@@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 
 import org.mockito.ArgumentCaptor;
+
 import static org.mockito.Mockito.*;
 
 public class TestResourceRetention {
@@ -81,7 +83,7 @@ public class TestResourceRetention {
     ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
       new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
     LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
-          trackerResources, false, conf));
+      null, trackerResources, false, conf, new NMNullStateStoreService()));
     for (int i = 0; i < nRsrcs; ++i) {
       final LocalResourceRequest req = new LocalResourceRequest(
           new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java Sat Jun  7 16:29:10 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -134,4 +135,9 @@ public class MockContainer implements Co
   public ContainerTokenIdentifier getContainerTokenIdentifier() {
     return this.containerTokenIdentifier;
   }
+
+  @Override
+  public NMContainerStatus getNMContainerStatus() {
+    return null;
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java Sat Jun  7 16:29:10 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -77,7 +78,8 @@ public class TestContainerLogsPage {
     NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
     healthChecker.init(conf);
     LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
-    NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, new ApplicationACLsManager(conf));
+    NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+        new ApplicationACLsManager(conf), new NMNullStateStoreService());
     // Add an application and the corresponding containers
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     String user = "nobody";

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Sat Jun  7 16:29:10 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -77,7 +79,8 @@ public class TestNMWebServer {
   }
   
   private int startNMWebAppServer(String webAddr) {
-    Context nmContext = new NodeManager.NMContext(null, null, null, null);
+    Context nmContext = new NodeManager.NMContext(null, null, null, null,
+        null);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -135,7 +138,8 @@ public class TestNMWebServer {
 
   @Test
   public void testNMWebApp() throws IOException, YarnException {
-    Context nmContext = new NodeManager.NMContext(null, null, null, null);
+    Context nmContext = new NodeManager.NMContext(null, null, null, null,
+        null);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -185,6 +189,7 @@ public class TestNMWebServer {
     ContainerId container2 =
         BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 1);
     NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
+    NMStateStoreService stateStore = new NMNullStateStoreService();
     for (ContainerId containerId : new ContainerId[] { container1,
         container2}) {
       // TODO: Use builder utils

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java Sat Jun  7 16:29:10 2014
@@ -107,7 +107,8 @@ public class TestNMWebServices extends J
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java Sat Jun  7 16:29:10 2014
@@ -99,7 +99,8 @@ public class TestNMWebServicesApps exten
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java Sat Jun  7 16:29:10 2014
@@ -122,7 +122,8 @@ public class TestNMWebServicesContainers
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) {
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null) {
         public NodeId getNodeId() {
           return NodeId.newInstance("testhost.foo.com", 8042);
         };

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Sat Jun  7 16:29:10 2014
@@ -401,6 +401,8 @@ public class ResourceManager extends Com
 
       // Initialize the scheduler
       scheduler = createScheduler();
+      scheduler.setRMContext(rmContext);
+      addIfService(scheduler);
       rmContext.setScheduler(scheduler);
 
       schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +431,6 @@ public class ResourceManager extends Com
       DefaultMetricsSystem.initialize("ResourceManager");
       JvmMetrics.initSingleton("ResourceManager", null);
 
-      try {
-        scheduler.reinitialize(conf, rmContext);
-      } catch (IOException ioe) {
-        throw new RuntimeException("Failed to initialize scheduler", ioe);
-      }
-
       // creating monitors that handle preemption
       createPolicyMonitors();
 

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Sat Jun  7 16:29:10 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -195,7 +195,7 @@ public class ResourceTrackerService exte
    */
   @SuppressWarnings("unchecked")
   @VisibleForTesting
-  void handleContainerStatus(ContainerStatus containerStatus) {
+  void handleNMContainerStatus(NMContainerStatus containerStatus) {
     ApplicationAttemptId appAttemptId =
         containerStatus.getContainerId().getApplicationAttemptId();
     RMApp rmApp =
@@ -219,11 +219,14 @@ public class ResourceTrackerService exte
     RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
     Container masterContainer = rmAppAttempt.getMasterContainer();
     if (masterContainer.getId().equals(containerStatus.getContainerId())
-        && containerStatus.getState() == ContainerState.COMPLETE) {
+        && containerStatus.getContainerState() == ContainerState.COMPLETE) {
+      ContainerStatus status =
+          ContainerStatus.newInstance(containerStatus.getContainerId(),
+            containerStatus.getContainerState(), containerStatus.getDiagnostics(),
+            containerStatus.getContainerExitStatus());
       // sending master container finished event.
       RMAppAttemptContainerFinishedEvent evt =
-          new RMAppAttemptContainerFinishedEvent(appAttemptId,
-              containerStatus);
+          new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
       rmContext.getDispatcher().getEventHandler().handle(evt);
     }
   }
@@ -240,11 +243,11 @@ public class ResourceTrackerService exte
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
 
-    if (!request.getContainerStatuses().isEmpty()) {
+    if (!request.getNMContainerStatuses().isEmpty()) {
       LOG.info("received container statuses on node manager register :"
-          + request.getContainerStatuses());
-      for (ContainerStatus containerStatus : request.getContainerStatuses()) {
-        handleContainerStatus(containerStatus);
+          + request.getNMContainerStatuses());
+      for (NMContainerStatus report : request.getNMContainerStatuses()) {
+        handleNMContainerStatus(report);
       }
     }
     RegisterNodeManagerResponse response = recordFactory

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Sat Jun  7 16:29:10 2014
@@ -90,7 +90,9 @@ public class ZKRMStateStore extends RMSt
 
   private String zkHostPort = null;
   private int zkSessionTimeout;
-  private long zkRetryInterval;
+
+  @VisibleForTesting
+  long zkRetryInterval;
   private List<ACL> zkAcl;
   private List<ZKUtil.ZKAuthInfo> zkAuths;
 
@@ -199,9 +201,14 @@ public class ZKRMStateStore extends RMSt
     zkSessionTimeout =
         conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
             YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-    zkRetryInterval =
-        conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
-          YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+
+    if (HAUtil.isHAEnabled(conf)) {
+      zkRetryInterval = zkSessionTimeout / numRetries;
+    } else {
+      zkRetryInterval =
+          conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+              YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+    }
 
     zkAcl = RMZKUtils.getZKAcls(conf);
     zkAuths = RMZKUtils.getZKAuths(conf);
@@ -539,7 +546,7 @@ public class ZKRMStateStore extends RMSt
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     }
-    LOG.info("Done Loading applications from ZK state store");
+    LOG.debug("Done Loading applications from ZK state store");
   }
 
   @Override
@@ -572,7 +579,7 @@ public class ZKRMStateStore extends RMSt
     } else {
       createWithRetries(nodeUpdatePath, appStateData, zkAcl,
         CreateMode.PERSISTENT);
-      LOG.info(appId + " znode didn't exist. Created a new znode to"
+      LOG.debug(appId + " znode didn't exist. Created a new znode to"
           + " update the application state.");
     }
   }
@@ -615,7 +622,7 @@ public class ZKRMStateStore extends RMSt
     } else {
       createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
         CreateMode.PERSISTENT);
-      LOG.info(appAttemptId + " znode didn't exist. Created a new znode to"
+      LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
           + " update the application attempt state.");
     }
   }
@@ -664,7 +671,7 @@ public class ZKRMStateStore extends RMSt
     if (existsWithRetries(nodeRemovePath, true) != null) {
       opList.add(Op.delete(nodeRemovePath, -1));
     } else {
-      LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
     doMultiWithRetries(opList);
   }
@@ -681,7 +688,7 @@ public class ZKRMStateStore extends RMSt
       // in case znode doesn't exist
       addStoreOrUpdateOps(
           opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
-      LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
     } else {
       // in case znode exists
       addStoreOrUpdateOps(
@@ -763,7 +770,7 @@ public class ZKRMStateStore extends RMSt
     if (existsWithRetries(nodeRemovePath, true) != null) {
       doMultiWithRetries(Op.delete(nodeRemovePath, -1));
     } else {
-      LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
   }
 
@@ -816,7 +823,7 @@ public class ZKRMStateStore extends RMSt
         case Expired:
           // the connection got terminated because of session timeout
           // call listener to reconnect
-          LOG.info("Session expired");
+          LOG.info("ZKRMStateStore Session expired");
           createConnection();
           break;
         default:
@@ -991,13 +998,13 @@ public class ZKRMStateStore extends RMSt
             throw new StoreFencedException();
           }
         } catch (KeeperException ke) {
+          LOG.info("Exception while executing a ZK operation.", ke);
           if (shouldRetry(ke.code()) && ++retry < numRetries) {
-            LOG.info("Waiting for zookeeper to be connected, retry no. + "
-                + retry);
+            LOG.info("Retrying operation on ZK. Retry no. " + retry);
             Thread.sleep(zkRetryInterval);
             continue;
           }
-          LOG.debug("Error while doing ZK operation.", ke);
+          LOG.info("Maxed out ZK retries. Giving up!");
           throw ke;
         }
       }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Sat Jun  7 16:29:10 2014
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.util.resou
 
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
-    implements ResourceScheduler {
+    extends AbstractService implements ResourceScheduler {
 
   private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
 
@@ -62,6 +63,15 @@ public abstract class AbstractYarnSchedu
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
     EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
 
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public AbstractYarnScheduler(String name) {
+    super(name);
+  }
+
   public synchronized List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Sat Jun  7 16:29:10 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
 @LimitedPrivate("yarn")
 @Evolving
 public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+  /**
+   * Set RMContext for <code>ResourceScheduler</code>.
+   * This method should be called immediately after instantiating
+   * a scheduler once.
+   * @param rmContext created by ResourceManager
+   */
+  void setRMContext(RMContext rmContext);
+
   /**
    * Re-initialize the <code>ResourceScheduler</code>.
    * @param conf configuration

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Sat Jun  7 16:29:10 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -76,6 +77,9 @@ public class SchedulerApplicationAttempt
   protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
   protected Resource currentConsumption = Resource.newInstance(0, 0);
+  private Resource amResource;
+  private boolean unmanagedAM = true;
+  private boolean amRunning = false;
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -106,6 +110,17 @@ public class SchedulerApplicationAttempt
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
             activeUsersManager);
     this.queue = queue;
+
+    if (rmContext != null && rmContext.getRMApps() != null &&
+        rmContext.getRMApps()
+            .containsKey(applicationAttemptId.getApplicationId())) {
+      ApplicationSubmissionContext appSubmissionContext =
+          rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+              .getApplicationSubmissionContext();
+      if (appSubmissionContext != null) {
+        unmanagedAM = appSubmissionContext.getUnmanagedAM();
+      }
+    }
   }
   
   /**
@@ -168,6 +183,26 @@ public class SchedulerApplicationAttempt
     return appSchedulingInfo.getQueueName();
   }
   
+  public Resource getAMResource() {
+    return amResource;
+  }
+
+  public void setAMResource(Resource amResource) {
+    this.amResource = amResource;
+  }
+
+  public boolean isAmRunning() {
+    return amRunning;
+  }
+
+  public void setAmRunning(boolean bool) {
+    amRunning = bool;
+  }
+
+  public boolean getUnmanagedAM() {
+    return unmanagedAM;
+  }
+
   public synchronized RMContainer getRMContainer(ContainerId id) {
     return liveContainers.get(id);
   }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sat Jun  7 16:29:10 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -102,6 +103,8 @@ public class CapacityScheduler extends
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
 
   private CSQueue root;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
   static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
     @Override
@@ -179,8 +182,6 @@ public class CapacityScheduler extends
 
   private int numNodeManagers = 0;
 
-  private boolean initialized = false;
-
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
 
@@ -196,7 +197,9 @@ public class CapacityScheduler extends
           + ".scheduling-interval-ms";
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   
-  public CapacityScheduler() {}
+  public CapacityScheduler() {
+    super(CapacityScheduler.class.getName());
+  }
 
   @Override
   public QueueMetrics getRootQueueMetrics() {
@@ -238,56 +241,91 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public RMContext getRMContext() {
+  public synchronized RMContext getRMContext() {
     return this.rmContext;
   }
-  
+
   @Override
-  public synchronized void
-      reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
+
+  private synchronized void initScheduler(Configuration configuration) throws
+      IOException {
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    this.minimumAllocation = this.conf.getMinimumAllocation();
+    this.maximumAllocation = this.conf.getMaximumAllocation();
+    this.calculator = this.conf.getResourceCalculator();
+    this.usePortForNodeName = this.conf.getUsePortForNodeName();
+    this.applications =
+        new ConcurrentHashMap<ApplicationId,
+            SchedulerApplication<FiCaSchedulerApp>>();
+
+    initializeQueues(this.conf);
+
+    scheduleAsynchronously = this.conf.getScheduleAynschronously();
+    asyncScheduleInterval =
+        this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+            DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+    if (scheduleAsynchronously) {
+      asyncSchedulerThread = new AsyncScheduleThread(this);
+    }
+
+    LOG.info("Initialized CapacityScheduler with " +
+        "calculator=" + getResourceCalculator().getClass() + ", " +
+        "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+        "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+        "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+        "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+  }
+
+  private synchronized void startSchedulerThreads() {
+    if (scheduleAsynchronously) {
+      Preconditions.checkNotNull(asyncSchedulerThread,
+          "asyncSchedulerThread is null");
+      asyncSchedulerThread.start();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
     Configuration configuration = new Configuration(conf);
-    if (!initialized) {
-      this.rmContext = rmContext;
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      this.minimumAllocation = this.conf.getMinimumAllocation();
-      this.maximumAllocation = this.conf.getMaximumAllocation();
-      this.calculator = this.conf.getResourceCalculator();
-      this.usePortForNodeName = this.conf.getUsePortForNodeName();
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+    initScheduler(configuration);
+    super.serviceInit(conf);
+  }
 
-      initializeQueues(this.conf);
-      
-      scheduleAsynchronously = this.conf.getScheduleAynschronously();
-      asyncScheduleInterval = 
-          this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, 
-              DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-      if (scheduleAsynchronously) {
-        asyncSchedulerThread = new AsyncScheduleThread(this);
-        asyncSchedulerThread.start();
-      }
-      
-      initialized = true;
-      LOG.info("Initialized CapacityScheduler with " +
-          "calculator=" + getResourceCalculator().getClass() + ", " +
-          "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
-          "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
-          "asynchronousScheduling=" + scheduleAsynchronously + ", " +
-          "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-      
-    } else {
-      CapacitySchedulerConfiguration oldConf = this.conf; 
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      try {
-        LOG.info("Re-initializing queues...");
-        reinitializeQueues(this.conf);
-      } catch (Throwable t) {
-        this.conf = oldConf;
-        throw new IOException("Failed to re-init queues", t);
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (scheduleAsynchronously && asyncSchedulerThread != null) {
+        asyncSchedulerThread.interrupt();
+        asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
     }
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void
+  reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+    Configuration configuration = new Configuration(conf);
+    CapacitySchedulerConfiguration oldConf = this.conf;
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    try {
+      LOG.info("Re-initializing queues...");
+      reinitializeQueues(this.conf);
+    } catch (Throwable t) {
+      this.conf = oldConf;
+      throw new IOException("Failed to re-init queues", t);
+    }
   }
   
   long getAsyncScheduleInterval() {

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Sat Jun  7 16:29:10 2014
@@ -53,6 +53,10 @@ public class AllocationConfiguration {
   private final int userMaxAppsDefault;
   private final int queueMaxAppsDefault;
 
+  // Maximum resource share for each leaf queue that can be used to run AMs
+  final Map<String, Float> queueMaxAMShares;
+  private final float queueMaxAMShareDefault;
+
   // ACL's for each queue. Only specifies non-default ACL's from configuration.
   private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
 
@@ -84,8 +88,9 @@ public class AllocationConfiguration {
   public AllocationConfiguration(Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources,
       Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-      Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
-      int queueMaxAppsDefault,
+      Map<String, ResourceWeights> queueWeights,
+      Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+      int queueMaxAppsDefault, float queueMaxAMShareDefault,
       Map<String, SchedulingPolicy> schedulingPolicies,
       SchedulingPolicy defaultSchedulingPolicy,
       Map<String, Long> minSharePreemptionTimeouts,
@@ -97,9 +102,11 @@ public class AllocationConfiguration {
     this.maxQueueResources = maxQueueResources;
     this.queueMaxApps = queueMaxApps;
     this.userMaxApps = userMaxApps;
+    this.queueMaxAMShares = queueMaxAMShares;
     this.queueWeights = queueWeights;
     this.userMaxAppsDefault = userMaxAppsDefault;
     this.queueMaxAppsDefault = queueMaxAppsDefault;
+    this.queueMaxAMShareDefault = queueMaxAMShareDefault;
     this.defaultSchedulingPolicy = defaultSchedulingPolicy;
     this.schedulingPolicies = schedulingPolicies;
     this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@@ -116,8 +123,10 @@ public class AllocationConfiguration {
     queueWeights = new HashMap<String, ResourceWeights>();
     queueMaxApps = new HashMap<String, Integer>();
     userMaxApps = new HashMap<String, Integer>();
+    queueMaxAMShares = new HashMap<String, Float>();
     userMaxAppsDefault = Integer.MAX_VALUE;
     queueMaxAppsDefault = Integer.MAX_VALUE;
+    queueMaxAMShareDefault = 1.0f;
     queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
     minSharePreemptionTimeouts = new HashMap<String, Long>();
     defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@@ -184,6 +193,11 @@ public class AllocationConfiguration {
     return (maxApps == null) ? queueMaxAppsDefault : maxApps;
   }
   
+  public float getQueueMaxAMShare(String queue) {
+    Float maxAMShare = queueMaxAMShares.get(queue);
+    return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
+  }
+
   /**
    * Get the minimum resource allocation for the given queue.
    * @return the cap set on this queue, or 0 if not set.

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Sat Jun  7 16:29:10 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
    * (this is done to prevent loading a file that hasn't been fully written).
    */
   public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-  
+
+  public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
   private final Clock clock;
 
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -96,58 +98,69 @@ public class AllocationFileLoaderService
   }
   
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     this.allocFile = getAllocationFile(conf);
-    super.init(conf);
-  }
-  
-  @Override
-  public void start() {
-    if (allocFile == null) {
-      return;
-    }
-    reloadThread = new Thread() {
-      public void run() {
-        while (running) {
-          long time = clock.getTime();
-          long lastModified = allocFile.lastModified();
-          if (lastModified > lastSuccessfulReload &&
-              time > lastModified + ALLOC_RELOAD_WAIT_MS) {
-            try {
-              reloadAllocations();
-            } catch (Exception ex) {
+    if (allocFile != null) {
+      reloadThread = new Thread() {
+        @Override
+        public void run() {
+          while (running) {
+            long time = clock.getTime();
+            long lastModified = allocFile.lastModified();
+            if (lastModified > lastSuccessfulReload &&
+                time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+              try {
+                reloadAllocations();
+              } catch (Exception ex) {
+                if (!lastReloadAttemptFailed) {
+                  LOG.error("Failed to reload fair scheduler config file - " +
+                      "will use existing allocations.", ex);
+                }
+                lastReloadAttemptFailed = true;
+              }
+            } else if (lastModified == 0l) {
               if (!lastReloadAttemptFailed) {
-                LOG.error("Failed to reload fair scheduler config file - " +
-                    "will use existing allocations.", ex);
+                LOG.warn("Failed to reload fair scheduler config file because" +
+                    " last modified returned 0. File exists: "
+                    + allocFile.exists());
               }
               lastReloadAttemptFailed = true;
             }
-          } else if (lastModified == 0l) {
-            if (!lastReloadAttemptFailed) {
-              LOG.warn("Failed to reload fair scheduler config file because" +
-                  " last modified returned 0. File exists: " + allocFile.exists());
+            try {
+              Thread.sleep(reloadIntervalMs);
+            } catch (InterruptedException ex) {
+              LOG.info(
+                  "Interrupted while waiting to reload alloc configuration");
             }
-            lastReloadAttemptFailed = true;
-          }
-          try {
-            Thread.sleep(reloadIntervalMs);
-          } catch (InterruptedException ex) {
-            LOG.info("Interrupted while waiting to reload alloc configuration");
           }
         }
-      }
-    };
-    reloadThread.setName("AllocationFileReloader");
-    reloadThread.setDaemon(true);
-    reloadThread.start();
-    super.start();
+      };
+      reloadThread.setName("AllocationFileReloader");
+      reloadThread.setDaemon(true);
+    }
+    super.serviceInit(conf);
   }
   
   @Override
-  public void stop() {
+  public void serviceStart() throws Exception {
+    if (reloadThread != null) {
+      reloadThread.start();
+    }
+    super.serviceStart();
+  }
+  
+  @Override
+  public void serviceStop() throws Exception {
     running = false;
-    reloadThread.interrupt();
-    super.stop();
+    if (reloadThread != null) {
+      reloadThread.interrupt();
+      try {
+        reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.warn("reloadThread fails to join.");
+      }
+    }
+    super.serviceStop();
   }
   
   /**
@@ -200,6 +213,7 @@ public class AllocationFileLoaderService
     Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
     Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
     Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+    Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
     Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -207,6 +221,7 @@ public class AllocationFileLoaderService
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
     int queueMaxAppsDefault = Integer.MAX_VALUE;
+    float queueMaxAMShareDefault = 1.0f;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@@ -273,6 +288,11 @@ public class AllocationFileLoaderService
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
           queueMaxAppsDefault = val;
+        } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.min(val, 1.0f);
+          queueMaxAMShareDefault = val;
         } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
             || "defaultQueueSchedulingMode".equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
@@ -297,8 +317,8 @@ public class AllocationFileLoaderService
         parent = null;
       }
       loadQueue(parent, element, minQueueResources, maxQueueResources,
-          queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-          minSharePreemptionTimeouts, queueAcls,
+          queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+          queuePolicies, minSharePreemptionTimeouts, queueAcls,
           configuredQueues);
     }
     
@@ -313,8 +333,8 @@ public class AllocationFileLoaderService
     }
     
     AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
-        queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
-        queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+        queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
+        queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
         queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
         newPlacementPolicy, configuredQueues);
     
@@ -329,7 +349,8 @@ public class AllocationFileLoaderService
    */
   private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+      Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
+      Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, 
@@ -361,6 +382,11 @@ public class AllocationFileLoaderService
         String text = ((Text)field.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
         queueMaxApps.put(queueName, val);
+      } else if ("maxAMShare".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.min(val, 1.0f);
+        queueMaxAMShares.put(queueName, val);
       } else if ("weight".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         double val = Double.parseDouble(text);
@@ -383,8 +409,9 @@ public class AllocationFileLoaderService
       } else if ("queue".endsWith(field.getTagName()) || 
           "pool".equals(field.getTagName())) {
         loadQueue(queueName, field, minQueueResources, maxQueueResources,
-            queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-            minSharePreemptionTimeouts, queueAcls, configuredQueues);
+            queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+            queuePolicies, minSharePreemptionTimeouts, queueAcls,
+            configuredQueues);
         configuredQueues.get(FSQueueType.PARENT).add(queueName);
         isLeaf = false;
       }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Sat Jun  7 16:29:10 2014
@@ -267,6 +267,13 @@ public class AppSchedulable extends Sche
       node.allocateContainer(app.getApplicationId(),
           allocatedContainer);
 
+      // If this container is used to run AM, update the leaf queue's AM usage
+      if (app.getLiveContainers().size() == 1 &&
+          !app.getUnmanagedAM()) {
+        queue.addAMResourceUsage(container.getResource());
+        app.setAmRunning(true);
+      }
+
       return container.getResource();
     } else {
       // The desired container won't fit here, so reserve
@@ -297,6 +304,14 @@ public class AppSchedulable extends Sche
         
         app.addSchedulingOpportunity(priority);
 
+        // Check the AM resource usage for the leaf queue
+        if (app.getLiveContainers().size() == 0
+            && !app.getUnmanagedAM()) {
+          if (!queue.canRunAppAM(app.getAMResource())) {
+            return Resources.none();
+          }
+        }
+
         ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
             node.getRackName());
         ResourceRequest localRequest = app.getResourceRequest(priority,

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Sat Jun  7 16:29:10 2014
@@ -55,6 +55,9 @@ public class FSLeafQueue extends FSQueue
   private long lastTimeAtMinShare;
   private long lastTimeAtHalfFairShare;
   
+  // Track the AM resource usage for this queue
+  private Resource amResourceUsage;
+
   private final ActiveUsersManager activeUsersManager;
   
   public FSLeafQueue(String name, FairScheduler scheduler,
@@ -63,6 +66,7 @@ public class FSLeafQueue extends FSQueue
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
     this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
+    amResourceUsage = Resource.newInstance(0, 0);
   }
   
   public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -86,6 +90,10 @@ public class FSLeafQueue extends FSQueue
    */
   public boolean removeApp(FSSchedulerApp app) {
     if (runnableAppScheds.remove(app.getAppSchedulable())) {
+      // Update AM resource usage
+      if (app.isAmRunning() && app.getAMResource() != null) {
+        Resources.subtractFrom(amResourceUsage, app.getAMResource());
+      }
       return true;
     } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
       return false;
@@ -145,6 +153,10 @@ public class FSLeafQueue extends FSQueue
     return usage;
   }
 
+  public Resource getAmResourceUsage() {
+    return amResourceUsage;
+  }
+
   @Override
   public void updateDemand() {
     // Compute demand by iterating through apps in the queue
@@ -284,4 +296,26 @@ public class FSLeafQueue extends FSQueue
   public ActiveUsersManager getActiveUsersManager() {
     return activeUsersManager;
   }
+
+  /**
+   * Check whether this queue can run this application master under the
+   * maxAMShare limit
+   *
+   * @param amResource
+   * @return true if this queue can run
+   */
+  public boolean canRunAppAM(Resource amResource) {
+    float maxAMShare =
+        scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
+    Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
+    Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
+    return !policy
+        .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
+  }
+
+  public void addAMResourceUsage(Resource amResource) {
+    if (amResource != null) {
+      Resources.addTo(amResourceUsage, amResource);
+    }
+  }
 }