You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [9/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Tue Aug 19 23:49:39 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -34,13 +35,17 @@ import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
@@ -52,10 +57,14 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 public class TestLocalResourcesTrackerImpl {
 
@@ -92,8 +101,8 @@ public class TestLocalResourcesTrackerIm
       localrsrc.put(req1, lr1);
       localrsrc.put(req2, lr2);
       LocalResourcesTracker tracker =
-          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
-            conf);
+          new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+              false, conf, new NMNullStateStoreService());
 
       ResourceEvent req11Event =
           new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -176,7 +185,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       localrsrc.put(req1, lr1);
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          dispatcher, localrsrc, false, conf);
+          null, dispatcher, localrsrc, false, conf,
+          new NMNullStateStoreService());
 
       ResourceEvent req11Event = new ResourceRequestEvent(req1,
           LocalResourceVisibility.PUBLIC, lc1);
@@ -246,7 +256,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker =
-          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
+          new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+              true, conf, new NMNullStateStoreService());
 
       LocalResourceRequest lr =
           createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
@@ -264,6 +275,7 @@ public class TestLocalResourcesTrackerIm
 
       // Container-1 requesting local resource.
       tracker.handle(reqEvent1);
+      dispatcher.await();
 
       // New localized Resource should have been added to local resource map
       // and the requesting container will be added to its waiting queue.
@@ -280,6 +292,7 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent2 =
           new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
       tracker.handle(reqEvent2);
+      dispatcher.await();
 
       // Container 2 should have been added to the waiting queue of the local
       // resource
@@ -295,6 +308,7 @@ public class TestLocalResourcesTrackerIm
       LocalizedResource localizedResource = localrsrc.get(lr);
       
       tracker.handle(resourceFailedEvent);
+      dispatcher.await();
 
       // After receiving failed resource event; all waiting containers will be
       // notified with Container Resource Failed Event.
@@ -308,6 +322,7 @@ public class TestLocalResourcesTrackerIm
       // exception.
       ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
       tracker.handle(relEvent1);
+      dispatcher.await();
 
       // Container-3 now requests for the same resource. This request call
       // is coming prior to Container-2's release call.
@@ -316,6 +331,7 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent3 =
           new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
       tracker.handle(reqEvent3);
+      dispatcher.await();
 
       // Local resource cache now should have the requested resource and the
       // number of waiting containers should be 1.
@@ -327,6 +343,7 @@ public class TestLocalResourcesTrackerIm
       // Container-2 Releases the resource
       ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
       tracker.handle(relEvent2);
+      dispatcher.await();
 
       // Making sure that there is no change in the cache after the release.
       Assert.assertEquals(1, localrsrc.size());
@@ -340,6 +357,7 @@ public class TestLocalResourcesTrackerIm
       ResourceLocalizedEvent localizedEvent =
           new ResourceLocalizedEvent(lr, localizedPath, 123L);
       tracker.handle(localizedEvent);
+      dispatcher.await();
       
       // Verifying ContainerResourceLocalizedEvent .
       verify(containerEventHandler, times(1)).handle(
@@ -351,6 +369,7 @@ public class TestLocalResourcesTrackerIm
       // Container-3 releasing the resource.
       ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
       tracker.handle(relEvent3);
+      dispatcher.await();
       
       Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
       
@@ -384,7 +403,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          dispatcher, localrsrc, true, conf);
+          null, dispatcher, localrsrc, true, conf,
+          new NMNullStateStoreService());
 
       // This is a random path. NO File creation will take place at this place.
       Path localDir = new Path("/tmp");
@@ -401,7 +421,9 @@ public class TestLocalResourcesTrackerIm
       tracker.handle(reqEvent1);
 
       // Simulate the process of localization of lr1
-      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+      // NOTE: Localization path from tracker has resource ID at end
+      Path hierarchicalPath1 =
+          tracker.getPathForLocalization(lr1, localDir).getParent();
       // Simulate lr1 getting localized
       ResourceLocalizedEvent rle1 =
           new ResourceLocalizedEvent(lr1,
@@ -417,7 +439,8 @@ public class TestLocalResourcesTrackerIm
           new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent2);
 
-      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      Path hierarchicalPath2 =
+          tracker.getPathForLocalization(lr2, localDir).getParent();
       // localization failed.
       ResourceFailedLocalizationEvent rfe2 =
           new ResourceFailedLocalizationEvent(
@@ -435,7 +458,8 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
           LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent3);
-      Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
+      Path hierarchicalPath3 =
+          tracker.getPathForLocalization(lr3, localDir).getParent();
       // localization successful
       ResourceLocalizedEvent rle3 =
           new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
@@ -479,6 +503,284 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testStateStoreSuccessfulLocalization() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    DeletionService mockDelService = mock(DeletionService.class);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+      // Container 1 requests lr1 to be localized
+      ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+          LocalResourceVisibility.APPLICATION, lc1);
+      tracker.handle(reqEvent1);
+      dispatcher.await();
+
+      // Simulate the process of localization of lr1
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+      ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+          ArgumentCaptor.forClass(LocalResourceProto.class);
+      ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+          localResourceCaptor.capture(), pathCaptor.capture());
+      LocalResourceProto lrProto = localResourceCaptor.getValue();
+      Path localizedPath1 = pathCaptor.getValue();
+      Assert.assertEquals(lr1,
+          new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+      Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+      // Simulate lr1 getting localized
+      ResourceLocalizedEvent rle1 =
+          new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120);
+      tracker.handle(rle1);
+      dispatcher.await();
+
+      ArgumentCaptor<LocalizedResourceProto> localizedProtoCaptor =
+          ArgumentCaptor.forClass(LocalizedResourceProto.class);
+      verify(stateStore).finishResourceLocalization(eq(user), eq(appId),
+          localizedProtoCaptor.capture());
+      LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue();
+      Assert.assertEquals(lr1, new LocalResourceRequest(
+          new LocalResourcePBImpl(localizedProto.getResource())));
+      Assert.assertEquals(localizedPath1.toString(),
+          localizedProto.getLocalPath());
+      LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1);
+      Assert.assertNotNull(localizedRsrc1);
+
+      // simulate release and retention processing
+      tracker.handle(new ResourceReleaseEvent(lr1, cId1));
+      dispatcher.await();
+      boolean removeResult = tracker.remove(localizedRsrc1, mockDelService);
+
+      Assert.assertTrue(removeResult);
+      verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+          eq(localizedPath1));
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testStateStoreFailedLocalization() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+      // Container 1 requests lr1 to be localized
+      ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+          LocalResourceVisibility.APPLICATION, lc1);
+      tracker.handle(reqEvent1);
+      dispatcher.await();
+
+      // Simulate the process of localization of lr1
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+      ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+          ArgumentCaptor.forClass(LocalResourceProto.class);
+      ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+          localResourceCaptor.capture(), pathCaptor.capture());
+      LocalResourceProto lrProto = localResourceCaptor.getValue();
+      Path localizedPath1 = pathCaptor.getValue();
+      Assert.assertEquals(lr1,
+          new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+      Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+      ResourceFailedLocalizationEvent rfe1 =
+          new ResourceFailedLocalizationEvent(
+              lr1, new Exception("Test").toString());
+      tracker.handle(rfe1);
+      dispatcher.await();
+      verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+          eq(localizedPath1));
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveredResource() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp/localdir");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      Assert.assertNull(tracker.getLocalizedResource(lr1));
+      final long localizedId1 = 52;
+      Path hierarchicalPath1 = new Path(localDir,
+          Long.toString(localizedId1));
+      Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+
+      // verify new paths reflect recovery of previous resources
+      LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc2 = new LocalizerContext(user, cId1, null);
+      ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2,
+          LocalResourceVisibility.APPLICATION, lc2);
+      tracker.handle(reqEvent2);
+      dispatcher.await();
+      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
+      Assert.assertEquals(localizedId1 + 1, localizedId2);
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveredResourceWithDirCacheMgr() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDirRoot = new Path("/tmp/localdir");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, true, conf, stateStore);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr1));
+      final long localizedId1 = 52;
+      Path hierarchicalPath1 = new Path(localDirRoot + "/4/2",
+          Long.toString(localizedId1));
+      Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+      LocalCacheDirectoryManager dirMgrRoot =
+          tracker.getDirectoryManager(localDirRoot);
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/2").getCount());
+
+      LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr2));
+      final long localizedId2 = localizedId1 + 1;
+      Path hierarchicalPath2 = new Path(localDirRoot + "/4/2",
+          Long.toString(localizedId2));
+      Path localizedPath2 = new Path(hierarchicalPath2, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr2, localizedPath2, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr2));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+
+      LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr3));
+      final long localizedId3 = 128;
+      Path hierarchicalPath3 = new Path(localDirRoot + "/4/3",
+          Long.toString(localizedId3));
+      Path localizedPath3 = new Path(hierarchicalPath3, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr3, localizedPath3, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr3));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+
+      LocalResourceRequest lr4 = createLocalResourceRequest(user, 4, 4,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr4));
+      final long localizedId4 = 256;
+      Path hierarchicalPath4 = new Path(localDirRoot + "/4",
+          Long.toString(localizedId4));
+      Path localizedPath4 = new Path(hierarchicalPath4, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr4, localizedPath4, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr4));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
   private boolean createdummylocalizefile(Path path) {
     boolean ret = false;
     File file = new File(path.toUri().getRawPath().toString());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Aug 19 23:49:39 2014
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
@@ -120,6 +122,10 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
@@ -188,7 +194,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService locService =
       spy(new ResourceLocalizationService(dispatcher, exec, delService,
-                                          diskhandler));
+                                          diskhandler,
+                                          new NMNullStateStoreService()));
     doReturn(lfs)
       .when(locService).getLocalFileContext(isA(Configuration.class));
     try {
@@ -253,7 +260,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler);
+                                      dirsHandler,
+                                      new NMNullStateStoreService());
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -287,7 +295,7 @@ public class TestResourceLocalizationSer
               user, appId);
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
       
       // init resources
       Random r = new Random();
@@ -402,6 +410,233 @@ public class TestResourceLocalizationSer
     }
   }
   
+  @Test
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testRecovery() throws Exception {
+    final String user1 = "user1";
+    final String user2 = "user2";
+    final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    final ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[4];
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+      sDirs[i] = localDirs.get(i).toString();
+    }
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+    NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+    //Ignore actual localization
+    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerBus);
+
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    ResourceLocalizationService spyService =
+        createSpyService(dispatcher, dirsHandler, stateStore);
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      final Application app1 = mock(Application.class);
+      when(app1.getUser()).thenReturn(user1);
+      when(app1.getAppId()).thenReturn(appId1);
+      final Application app2 = mock(Application.class);
+      when(app2.getUser()).thenReturn(user2);
+      when(app2.getAppId()).thenReturn(appId2);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app1));
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app2));
+      dispatcher.await();
+
+      //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
+      LocalResourcesTracker appTracker1 =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user1, appId1);
+      LocalResourcesTracker privTracker1 =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+              user1, null);
+      LocalResourcesTracker appTracker2 =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user2, appId2);
+      LocalResourcesTracker pubTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+              null, null);
+
+      // init containers
+      final Container c1 = getMockContainer(appId1, 1, user1);
+      final Container c2 = getMockContainer(appId2, 2, user2);
+
+      // init resources
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+
+      // Send localization requests of each type.
+      final LocalResource privResource1 = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq1 =
+          new LocalResourceRequest(privResource1);
+      final LocalResource privResource2 = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq2 =
+          new LocalResourceRequest(privResource2);
+
+      final LocalResource pubResource1 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq1 =
+          new LocalResourceRequest(pubResource1);
+      final LocalResource pubResource2 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq2 =
+          new LocalResourceRequest(pubResource2);
+
+      final LocalResource appResource1 = getAppMockedResource(r);
+      final LocalResourceRequest appReq1 =
+          new LocalResourceRequest(appResource1);
+      final LocalResource appResource2 = getAppMockedResource(r);
+      final LocalResourceRequest appReq2 =
+          new LocalResourceRequest(appResource2);
+      final LocalResource appResource3 = getAppMockedResource(r);
+      final LocalResourceRequest appReq3 =
+          new LocalResourceRequest(appResource3);
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req1 =
+          new HashMap<LocalResourceVisibility,
+                      Collection<LocalResourceRequest>>();
+      req1.put(LocalResourceVisibility.PRIVATE,
+          Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 }));
+      req1.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq1));
+      req1.put(LocalResourceVisibility.APPLICATION,
+          Collections.singletonList(appReq1));
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+        new HashMap<LocalResourceVisibility,
+                    Collection<LocalResourceRequest>>();
+      req2.put(LocalResourceVisibility.APPLICATION,
+          Arrays.asList(new LocalResourceRequest[] { appReq2, appReq3 }));
+      req2.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq2));
+
+      // Send Request event
+      spyService.handle(new ContainerLocalizationRequestEvent(c1, req1));
+      spyService.handle(new ContainerLocalizationRequestEvent(c2, req2));
+      dispatcher.await();
+
+      // Simulate start of localization for all resources
+      privTracker1.getPathForLocalization(privReq1,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.USERCACHE + user1));
+      privTracker1.getPathForLocalization(privReq2,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.USERCACHE + user1));
+      LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
+      LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
+      appTracker1.getPathForLocalization(appReq1,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId1));
+      LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
+      appTracker2.getPathForLocalization(appReq2,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId2));
+      LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
+      appTracker2.getPathForLocalization(appReq3,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId2));
+      LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
+      pubTracker.getPathForLocalization(pubReq1,
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+      LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
+      pubTracker.getPathForLocalization(pubReq2,
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+      LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
+
+      // Simulate completion of localization for most resources with
+      // possibly different sizes than in the request
+      assertNotNull("Localization not started", privLr1.getLocalPath());
+      privTracker1.handle(new ResourceLocalizedEvent(privReq1,
+          privLr1.getLocalPath(), privLr1.getSize() + 5));
+      assertNotNull("Localization not started", privLr2.getLocalPath());
+      privTracker1.handle(new ResourceLocalizedEvent(privReq2,
+          privLr2.getLocalPath(), privLr2.getSize() + 10));
+      assertNotNull("Localization not started", appLr1.getLocalPath());
+      appTracker1.handle(new ResourceLocalizedEvent(appReq1,
+          appLr1.getLocalPath(), appLr1.getSize()));
+      assertNotNull("Localization not started", appLr3.getLocalPath());
+      appTracker2.handle(new ResourceLocalizedEvent(appReq3,
+          appLr3.getLocalPath(), appLr3.getSize() + 7));
+      assertNotNull("Localization not started", pubLr1.getLocalPath());
+      pubTracker.handle(new ResourceLocalizedEvent(pubReq1,
+          pubLr1.getLocalPath(), pubLr1.getSize() + 1000));
+      assertNotNull("Localization not started", pubLr2.getLocalPath());
+      pubTracker.handle(new ResourceLocalizedEvent(pubReq2,
+          pubLr2.getLocalPath(), pubLr2.getSize() + 99999));
+
+      dispatcher.await();
+      assertEquals(ResourceState.LOCALIZED, privLr1.getState());
+      assertEquals(ResourceState.LOCALIZED, privLr2.getState());
+      assertEquals(ResourceState.LOCALIZED, appLr1.getState());
+      assertEquals(ResourceState.DOWNLOADING, appLr2.getState());
+      assertEquals(ResourceState.LOCALIZED, appLr3.getState());
+      assertEquals(ResourceState.LOCALIZED, pubLr1.getState());
+      assertEquals(ResourceState.LOCALIZED, pubLr2.getState());
+
+      // restart and recover
+      spyService = createSpyService(dispatcher, dirsHandler, stateStore);
+      spyService.init(conf);
+      spyService.recoverLocalizedResources(
+          stateStore.loadLocalizationState());
+      dispatcher.await();
+
+      appTracker1 = spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user1, appId1);
+      privTracker1 = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PRIVATE, user1, null);
+      appTracker2 = spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user2, appId2);
+      pubTracker = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PUBLIC, null, null);
+
+      LocalizedResource recoveredRsrc =
+          privTracker1.getLocalizedResource(privReq1);
+      assertEquals(privReq1, recoveredRsrc.getRequest());
+      assertEquals(privLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(privLr1.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = privTracker1.getLocalizedResource(privReq2);
+      assertEquals(privReq2, recoveredRsrc.getRequest());
+      assertEquals(privLr2.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(privLr2.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = appTracker1.getLocalizedResource(appReq1);
+      assertEquals(appReq1, recoveredRsrc.getRequest());
+      assertEquals(appLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(appLr1.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = appTracker2.getLocalizedResource(appReq2);
+      assertNull("in-progress resource should not be present", recoveredRsrc);
+      recoveredRsrc = appTracker2.getLocalizedResource(appReq3);
+      assertEquals(appReq3, recoveredRsrc.getRequest());
+      assertEquals(appLr3.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(appLr3.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+    } finally {
+      dispatcher.stop();
+      stateStore.close();
+    }
+  }
+
   @Test( timeout = 10000)
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
@@ -436,7 +671,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler);
+                                      dirsHandler,
+                                      new NMNullStateStoreService());
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -469,7 +705,7 @@ public class TestResourceLocalizationSer
       long seed = r.nextLong();
       System.out.println("SEED: " + seed);
       r.setSeed(seed);
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, "user0");
       FSDataOutputStream out =
         new FSDataOutputStream(new DataOutputBuffer(), null);
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
@@ -616,7 +852,8 @@ public class TestResourceLocalizationSer
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-                                        dirsHandler);
+                                        dirsHandler,
+                                        new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -637,7 +874,7 @@ public class TestResourceLocalizationSer
       dispatcher.await();
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
 
       // init resources
       Random r = new Random();
@@ -725,7 +962,7 @@ public class TestResourceLocalizationSer
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-            dirsHandlerSpy);
+            dirsHandlerSpy, new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -758,7 +995,7 @@ public class TestResourceLocalizationSer
         .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
 
       // first test ioexception
       Mockito
@@ -838,7 +1075,7 @@ public class TestResourceLocalizationSer
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler);
+            localDirHandler, new NMNullStateStoreService());
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -991,7 +1228,7 @@ public class TestResourceLocalizationSer
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler);
+            localDirHandler, new NMNullStateStoreService());
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -1157,7 +1394,7 @@ public class TestResourceLocalizationSer
       // it as otherwise it will remove requests from pending queue.
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            dirsHandler);
+            dirsHandler, new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       dispatcher1.register(LocalizationEventType.class, spyService);
       spyService.init(conf);
@@ -1424,12 +1661,13 @@ public class TestResourceLocalizationSer
     return getMockedResource(r, LocalResourceVisibility.PRIVATE);
   }
 
-  private static Container getMockContainer(ApplicationId appId, int id) {
+  private static Container getMockContainer(ApplicationId appId, int id,
+      String user) {
     Container c = mock(Container.class);
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(appId, 1);
     ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
-    when(c.getUser()).thenReturn("user0");
+    when(c.getUser()).thenReturn(user);
     when(c.getContainerId()).thenReturn(cId);
     Credentials creds = new Credentials();
     creds.addToken(new Text("tok" + id), getToken(id));
@@ -1438,6 +1676,24 @@ public class TestResourceLocalizationSer
     return c;
   }
 
+  private ResourceLocalizationService createSpyService(
+      DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
+      NMStateStoreService stateStore) {
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
+    DeletionService delService = mock(DeletionService.class);
+    ResourceLocalizationService rawService =
+      new ResourceLocalizationService(dispatcher, exec, delService,
+                                      dirsHandler, stateStore);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
+        isA(Configuration.class));
+    doReturn(lfs).when(spyService)
+        .getLocalFileContext(isA(Configuration.class));
+    return spyService;
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   static Token<? extends TokenIdentifier> getToken(int id) {
     return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Tue Aug 19 23:49:39 2014
@@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 
 import org.mockito.ArgumentCaptor;
+
 import static org.mockito.Mockito.*;
 
 public class TestResourceRetention {
@@ -81,7 +83,7 @@ public class TestResourceRetention {
     ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
       new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
     LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
-          trackerResources, false, conf));
+      null, trackerResources, false, conf, new NMNullStateStoreService()));
     for (int i = 0; i < nRsrcs; ++i) {
       final LocalResourceRequest req = new LocalResourceRequest(
           new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Tue Aug 19 23:49:39 2014
@@ -34,8 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
@@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.event.Asyn
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@@ -70,6 +69,7 @@ import org.apache.hadoop.yarn.util.Linux
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.TestProcfsBasedProcessTree;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -87,6 +87,7 @@ public class TestContainersMonitor exten
     conf.setClass(
         YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
         LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
     super.setup();
   }
 
@@ -230,7 +231,8 @@ public class TestContainersMonitor exten
     Resource r = BuilderUtils.newResource(8 * 1024 * 1024, 1);
     ContainerTokenIdentifier containerIdentifier =
         new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
-          r, System.currentTimeMillis() + 120000, 123, DUMMY_RM_IDENTIFIER);
+          r, System.currentTimeMillis() + 120000, 123, DUMMY_RM_IDENTIFIER,
+          Priority.newInstance(0), 0);
     Token containerToken =
         BuilderUtils.newContainerToken(context.getNodeId(),
           containerManager.getContext().getContainerTokenSecretManager()
@@ -270,7 +272,7 @@ public class TestContainersMonitor exten
         GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus =
         containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
-    Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
+    Assert.assertEquals(ContainerExitStatus.KILLED_EXCEEDED_VMEM,
         containerStatus.getExitStatus());
     String expectedMsgPattern =
         "Container \\[pid=" + pid + ",containerID=" + cId

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java Tue Aug 19 23:49:39 2014
@@ -31,8 +31,11 @@ public class TestNodeManagerMetrics {
     NodeManagerMetrics metrics = NodeManagerMetrics.create();
     Resource total = Records.newRecord(Resource.class);
     total.setMemory(8*GiB);
+    total.setVirtualCores(16);
     Resource resource = Records.newRecord(Resource.class);
     resource.setMemory(1*GiB);
+    resource.setVirtualCores(2);
+
 
     metrics.addResource(total);
 
@@ -57,12 +60,12 @@ public class TestNodeManagerMetrics {
     metrics.initingContainer();
     metrics.runningContainer();
 
-    checkMetrics(5, 1, 1, 1, 1, 1, 2, 2, 6);
+    checkMetrics(5, 1, 1, 1, 1, 1, 2, 2, 6, 4, 12);
   }
 
   private void checkMetrics(int launched, int completed, int failed, int killed,
                             int initing, int running, int allocatedGB,
-                            int allocatedContainers, int availableGB) {
+                            int allocatedContainers, int availableGB, int allocatedVCores, int availableVCores) {
     MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
     assertCounter("ContainersLaunched", launched, rb);
     assertCounter("ContainersCompleted", completed, rb);
@@ -71,7 +74,10 @@ public class TestNodeManagerMetrics {
     assertGauge("ContainersIniting", initing, rb);
     assertGauge("ContainersRunning", running, rb);
     assertGauge("AllocatedGB", allocatedGB, rb);
+    assertGauge("AllocatedVCores", allocatedVCores, rb);
     assertGauge("AllocatedContainers", allocatedContainers, rb);
     assertGauge("AvailableGB", availableGB, rb);
+    assertGauge("AvailableVCores",availableVCores, rb);
+
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java Tue Aug 19 23:49:39 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -134,4 +135,9 @@ public class MockContainer implements Co
   public ContainerTokenIdentifier getContainerTokenIdentifier() {
     return this.containerTokenIdentifier;
   }
+
+  @Override
+  public NMContainerStatus getNMContainerStatus() {
+    return null;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java Tue Aug 19 23:49:39 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -77,7 +78,8 @@ public class TestContainerLogsPage {
     NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
     healthChecker.init(conf);
     LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
-    NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, new ApplicationACLsManager(conf));
+    NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+        new ApplicationACLsManager(conf), new NMNullStateStoreService());
     // Add an application and the corresponding containers
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     String user = "nobody";

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue Aug 19 23:49:39 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -77,7 +79,8 @@ public class TestNMWebServer {
   }
   
   private int startNMWebAppServer(String webAddr) {
-    Context nmContext = new NodeManager.NMContext(null, null, null, null);
+    Context nmContext = new NodeManager.NMContext(null, null, null, null,
+        null);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -88,6 +91,10 @@ public class TestNMWebServer {
         return 0;
       }
       @Override
+      public long getVCoresAllocatedForContainers() {
+        return 0;
+      }
+      @Override
       public boolean isVmemCheckEnabled() {
         return true;
       }
@@ -135,7 +142,8 @@ public class TestNMWebServer {
 
   @Test
   public void testNMWebApp() throws IOException, YarnException {
-    Context nmContext = new NodeManager.NMContext(null, null, null, null);
+    Context nmContext = new NodeManager.NMContext(null, null, null, null,
+        null);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -146,6 +154,10 @@ public class TestNMWebServer {
         return 0;
       }
       @Override
+      public long getVCoresAllocatedForContainers() {
+        return 0;
+      }
+      @Override
       public boolean isVmemCheckEnabled() {
         return true;
       }
@@ -185,6 +197,7 @@ public class TestNMWebServer {
     ContainerId container2 =
         BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 1);
     NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
+    NMStateStoreService stateStore = new NMNullStateStoreService();
     for (ContainerId containerId : new ContainerId[] { container1,
         container2}) {
       // TODO: Use builder utils
@@ -196,7 +209,7 @@ public class TestNMWebServer {
             BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
             "password".getBytes(), currentTime);
       Container container =
-          new ContainerImpl(conf, dispatcher, launchContext,
+          new ContainerImpl(conf, dispatcher, stateStore, launchContext,
             null, metrics,
             BuilderUtils.newContainerTokenIdentifier(containerToken)) {
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java Tue Aug 19 23:49:39 2014
@@ -107,7 +107,8 @@ public class TestNMWebServices extends J
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {
@@ -123,6 +124,10 @@ public class TestNMWebServices extends J
           return new Long("17179869184");
         }
         @Override
+        public long getVCoresAllocatedForContainers() {
+          return new Long("4000");
+        }
+        @Override
         public boolean isVmemCheckEnabled() {
           return true;
         }
@@ -374,6 +379,8 @@ public class TestNMWebServices extends J
               "totalVmemAllocatedContainersMB"),
           WebServicesTestUtils.getXmlLong(element,
               "totalPmemAllocatedContainersMB"),
+          WebServicesTestUtils.getXmlLong(element,
+              "totalVCoresAllocatedContainers"),
           WebServicesTestUtils.getXmlBoolean(element, "vmemCheckEnabled"),
           WebServicesTestUtils.getXmlBoolean(element, "pmemCheckEnabled"),
           WebServicesTestUtils.getXmlLong(element, "lastNodeUpdateTime"),
@@ -392,10 +399,11 @@ public class TestNMWebServices extends J
   public void verifyNodeInfo(JSONObject json) throws JSONException, Exception {
     assertEquals("incorrect number of elements", 1, json.length());
     JSONObject info = json.getJSONObject("nodeInfo");
-    assertEquals("incorrect number of elements", 15, info.length());
+    assertEquals("incorrect number of elements", 16, info.length());
     verifyNodeInfoGeneric(info.getString("id"), info.getString("healthReport"),
         info.getLong("totalVmemAllocatedContainersMB"),
         info.getLong("totalPmemAllocatedContainersMB"),
+        info.getLong("totalVCoresAllocatedContainers"),
         info.getBoolean("vmemCheckEnabled"),
         info.getBoolean("pmemCheckEnabled"),
         info.getLong("lastNodeUpdateTime"), info.getBoolean("nodeHealthy"),
@@ -409,6 +417,7 @@ public class TestNMWebServices extends J
 
   public void verifyNodeInfoGeneric(String id, String healthReport,
       long totalVmemAllocatedContainersMB, long totalPmemAllocatedContainersMB,
+      long totalVCoresAllocatedContainers,
       boolean vmemCheckEnabled, boolean pmemCheckEnabled,
       long lastNodeUpdateTime, Boolean nodeHealthy, String nodeHostName,
       String hadoopVersionBuiltOn, String hadoopBuildVersion,
@@ -422,6 +431,8 @@ public class TestNMWebServices extends J
         totalVmemAllocatedContainersMB);
     assertEquals("totalPmemAllocatedContainersMB incorrect", 16384,
         totalPmemAllocatedContainersMB);
+    assertEquals("totalVCoresAllocatedContainers incorrect", 4000,
+        totalVCoresAllocatedContainers);
     assertEquals("vmemCheckEnabled incorrect",  true, vmemCheckEnabled);
     assertEquals("pmemCheckEnabled incorrect",  true, pmemCheckEnabled);
     assertTrue("lastNodeUpdateTime incorrect", lastNodeUpdateTime == nmContext

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java Tue Aug 19 23:49:39 2014
@@ -99,7 +99,8 @@ public class TestNMWebServicesApps exten
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {
@@ -116,6 +117,12 @@ public class TestNMWebServicesApps exten
         }
 
         @Override
+        public long getVCoresAllocatedForContainers() {
+          return new Long("4000");
+        }
+
+
+        @Override
         public boolean isVmemCheckEnabled() {
           return true;
         }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java Tue Aug 19 23:49:39 2014
@@ -107,6 +107,11 @@ public class TestNMWebServicesContainers
         }
 
         @Override
+        public long getVCoresAllocatedForContainers() {
+          return new Long("4000");
+        }
+
+        @Override
         public boolean isVmemCheckEnabled() {
           return true;
         }
@@ -122,7 +127,8 @@ public class TestNMWebServicesContainers
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) {
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null) {
         public NodeId getNodeId() {
           return NodeId.newInstance("testhost.foo.com", 8042);
         };
@@ -460,24 +466,27 @@ public class TestNMWebServicesContainers
           WebServicesTestUtils.getXmlString(element, "diagnostics"),
           WebServicesTestUtils.getXmlString(element, "nodeId"),
           WebServicesTestUtils.getXmlInt(element, "totalMemoryNeededMB"),
+          WebServicesTestUtils.getXmlInt(element, "totalVCoresNeeded"),
           WebServicesTestUtils.getXmlString(element, "containerLogsLink"));
     }
   }
 
   public void verifyNodeContainerInfo(JSONObject info, Container cont)
       throws JSONException, Exception {
-    assertEquals("incorrect number of elements", 8, info.length());
+    assertEquals("incorrect number of elements", 9, info.length());
 
     verifyNodeContainerInfoGeneric(cont, info.getString("id"),
         info.getString("state"), info.getString("user"),
         info.getInt("exitCode"), info.getString("diagnostics"),
         info.getString("nodeId"), info.getInt("totalMemoryNeededMB"),
+        info.getInt("totalVCoresNeeded"),
         info.getString("containerLogsLink"));
   }
 
   public void verifyNodeContainerInfoGeneric(Container cont, String id,
       String state, String user, int exitCode, String diagnostics,
-      String nodeId, int totalMemoryNeededMB, String logsLink)
+      String nodeId, int totalMemoryNeededMB, int totalVCoresNeeded,
+      String logsLink)
       throws JSONException, Exception {
     WebServicesTestUtils.checkStringMatch("id", cont.getContainerId()
         .toString(), id);
@@ -494,6 +503,9 @@ public class TestNMWebServicesContainers
     assertEquals("totalMemoryNeededMB wrong",
       YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
       totalMemoryNeededMB);
+    assertEquals("totalVCoresNeeded wrong",
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+      totalVCoresNeeded);
     String shortLink =
         ujoin("containerlogs", cont.getContainerId().toString(),
             cont.getUser());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml Tue Aug 19 23:49:39 2014
@@ -108,4 +108,27 @@
     </description>
   </property>
 
+  <property>
+    <name>yarn.scheduler.capacity.queue-mappings</name>
+    <value></value>
+    <description>
+      A list of mappings that will be used to assign jobs to queues
+      The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
+      Typically this list will be used to map users to queues,
+      for example, u:%user:%user maps all users to queues with the same name
+      as the user.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
+    <value>false</value>
+    <description>
+      If a queue mapping is present, will it override the value specified
+      by the user? This can be used by administrators to place jobs in queues
+      that are different than the one specified by the user.
+      The default is false.
+    </description>
+  </property>
+
 </configuration>

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Tue Aug 19 23:49:39 2014
@@ -42,24 +42,6 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-el</groupId>
-          <artifactId>commons-el</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-compiler</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jsp-2.1-jetty</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>
@@ -212,6 +194,21 @@
 
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
       <groupId>com.sun.jersey.jersey-test-framework</groupId>
       <artifactId>jersey-test-framework-grizzly2</artifactId>
       <scope>test</scope>
@@ -247,6 +244,37 @@
           </execution>
         </executions>
       </plugin>
+
+     <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
+                <param>${basedir}/../../hadoop-yarn-api/src/main/proto</param>
+                <param>${basedir}/../hadoop-yarn-server-common/src/main/proto</param>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+		          <include>yarn_server_resourcemanager_recovery.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -86,9 +87,12 @@ public class AdminService extends Compos
   private String rmId;
 
   private boolean autoFailoverEnabled;
+  private EmbeddedElectorService embeddedElector;
 
   private Server server;
-  private InetSocketAddress masterServiceAddress;
+
+  // Address to use for binding. May be a wildcard address.
+  private InetSocketAddress masterServiceBindAddress;
   private AccessControlList adminAcl;
 
   private final RecordFactory recordFactory = 
@@ -101,20 +105,23 @@ public class AdminService extends Compos
   }
 
   @Override
-  public synchronized void serviceInit(Configuration conf) throws Exception {
+  public void serviceInit(Configuration conf) throws Exception {
     if (rmContext.isHAEnabled()) {
       autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
       if (autoFailoverEnabled) {
         if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
-          addIfService(createEmbeddedElectorService());
+          embeddedElector = createEmbeddedElectorService();
+          addIfService(embeddedElector);
         }
       }
     }
 
-    masterServiceAddress = conf.getSocketAddr(
+    masterServiceBindAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
         YarnConfiguration.RM_ADMIN_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
+
     adminAcl = new AccessControlList(conf.get(
         YarnConfiguration.YARN_ADMIN_ACL,
         YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
@@ -123,13 +130,13 @@ public class AdminService extends Compos
   }
 
   @Override
-  protected synchronized void serviceStart() throws Exception {
+  protected void serviceStart() throws Exception {
     startServer();
     super.serviceStart();
   }
 
   @Override
-  protected synchronized void serviceStop() throws Exception {
+  protected void serviceStop() throws Exception {
     stopServer();
     super.serviceStop();
   }
@@ -138,7 +145,7 @@ public class AdminService extends Compos
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
     this.server = (Server) rpc.getServer(
-        ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
+        ResourceManagerAdministrationProtocol.class, this, masterServiceBindAddress,
         conf, null,
         conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
             YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
@@ -167,8 +174,10 @@ public class AdminService extends Compos
     }
 
     this.server.start();
-    conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
-        server.getListenerAddress());
+    conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+                           YarnConfiguration.RM_ADMIN_ADDRESS,
+                           YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+                           server.getListenerAddress());
   }
 
   protected void stopServer() throws Exception {
@@ -181,6 +190,13 @@ public class AdminService extends Compos
     return new EmbeddedElectorService(rmContext);
   }
 
+  @InterfaceAudience.Private
+  void resetLeaderElection() {
+    if (embeddedElector != null) {
+      embeddedElector.resetLeaderElection();
+    }
+  }
+
   private UserGroupInformation checkAccess(String method) throws IOException {
     return RMServerUtils.verifyAccess(adminAcl, method, LOG);
   }