You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/07 18:29:20 UTC
svn commit: r1601151 [3/5] - in
/hadoop/common/branches/HDFS-5442/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yar...
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Sat Jun 7 16:29:10 2014
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@@ -120,6 +122,10 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
@@ -188,7 +194,8 @@ public class TestResourceLocalizationSer
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
- diskhandler));
+ diskhandler,
+ new NMNullStateStoreService()));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@@ -253,7 +260,8 @@ public class TestResourceLocalizationSer
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandler);
+ dirsHandler,
+ new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -287,7 +295,7 @@ public class TestResourceLocalizationSer
user, appId);
// init container.
- final Container c = getMockContainer(appId, 42);
+ final Container c = getMockContainer(appId, 42, user);
// init resources
Random r = new Random();
@@ -402,6 +410,233 @@ public class TestResourceLocalizationSer
}
}
+ @Test
+ @SuppressWarnings("unchecked") // mocked generics
+ public void testRecovery() throws Exception {
+ final String user1 = "user1";
+ final String user2 = "user2";
+ final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+ final ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+ NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+ //Ignore actual localization
+ EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerBus);
+
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
+
+ ResourceLocalizationService spyService =
+ createSpyService(dispatcher, dirsHandler, stateStore);
+ try {
+ spyService.init(conf);
+ spyService.start();
+
+ final Application app1 = mock(Application.class);
+ when(app1.getUser()).thenReturn(user1);
+ when(app1.getAppId()).thenReturn(appId1);
+ final Application app2 = mock(Application.class);
+ when(app2.getUser()).thenReturn(user2);
+ when(app2.getAppId()).thenReturn(appId2);
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app1));
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app2));
+ dispatcher.await();
+
+ //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
+ LocalResourcesTracker appTracker1 =
+ spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.APPLICATION, user1, appId1);
+ LocalResourcesTracker privTracker1 =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+ user1, null);
+ LocalResourcesTracker appTracker2 =
+ spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.APPLICATION, user2, appId2);
+ LocalResourcesTracker pubTracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+ null, null);
+
+ // init containers
+ final Container c1 = getMockContainer(appId1, 1, user1);
+ final Container c2 = getMockContainer(appId2, 2, user2);
+
+ // init resources
+ Random r = new Random();
+ long seed = r.nextLong();
+ System.out.println("SEED: " + seed);
+ r.setSeed(seed);
+
+ // Send localization requests of each type.
+ final LocalResource privResource1 = getPrivateMockedResource(r);
+ final LocalResourceRequest privReq1 =
+ new LocalResourceRequest(privResource1);
+ final LocalResource privResource2 = getPrivateMockedResource(r);
+ final LocalResourceRequest privReq2 =
+ new LocalResourceRequest(privResource2);
+
+ final LocalResource pubResource1 = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq1 =
+ new LocalResourceRequest(pubResource1);
+ final LocalResource pubResource2 = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq2 =
+ new LocalResourceRequest(pubResource2);
+
+ final LocalResource appResource1 = getAppMockedResource(r);
+ final LocalResourceRequest appReq1 =
+ new LocalResourceRequest(appResource1);
+ final LocalResource appResource2 = getAppMockedResource(r);
+ final LocalResourceRequest appReq2 =
+ new LocalResourceRequest(appResource2);
+ final LocalResource appResource3 = getAppMockedResource(r);
+ final LocalResourceRequest appReq3 =
+ new LocalResourceRequest(appResource3);
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req1 =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ req1.put(LocalResourceVisibility.PRIVATE,
+ Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 }));
+ req1.put(LocalResourceVisibility.PUBLIC,
+ Collections.singletonList(pubReq1));
+ req1.put(LocalResourceVisibility.APPLICATION,
+ Collections.singletonList(appReq1));
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ req2.put(LocalResourceVisibility.APPLICATION,
+ Arrays.asList(new LocalResourceRequest[] { appReq2, appReq3 }));
+ req2.put(LocalResourceVisibility.PUBLIC,
+ Collections.singletonList(pubReq2));
+
+ // Send Request event
+ spyService.handle(new ContainerLocalizationRequestEvent(c1, req1));
+ spyService.handle(new ContainerLocalizationRequestEvent(c2, req2));
+ dispatcher.await();
+
+ // Simulate start of localization for all resources
+ privTracker1.getPathForLocalization(privReq1,
+ dirsHandler.getLocalPathForWrite(
+ ContainerLocalizer.USERCACHE + user1));
+ privTracker1.getPathForLocalization(privReq2,
+ dirsHandler.getLocalPathForWrite(
+ ContainerLocalizer.USERCACHE + user1));
+ LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
+ LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
+ appTracker1.getPathForLocalization(appReq1,
+ dirsHandler.getLocalPathForWrite(
+ ContainerLocalizer.APPCACHE + appId1));
+ LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
+ appTracker2.getPathForLocalization(appReq2,
+ dirsHandler.getLocalPathForWrite(
+ ContainerLocalizer.APPCACHE + appId2));
+ LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
+ appTracker2.getPathForLocalization(appReq3,
+ dirsHandler.getLocalPathForWrite(
+ ContainerLocalizer.APPCACHE + appId2));
+ LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
+ pubTracker.getPathForLocalization(pubReq1,
+ dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+ LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
+ pubTracker.getPathForLocalization(pubReq2,
+ dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+ LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
+
+ // Simulate completion of localization for most resources with
+ // possibly different sizes than in the request
+ assertNotNull("Localization not started", privLr1.getLocalPath());
+ privTracker1.handle(new ResourceLocalizedEvent(privReq1,
+ privLr1.getLocalPath(), privLr1.getSize() + 5));
+ assertNotNull("Localization not started", privLr2.getLocalPath());
+ privTracker1.handle(new ResourceLocalizedEvent(privReq2,
+ privLr2.getLocalPath(), privLr2.getSize() + 10));
+ assertNotNull("Localization not started", appLr1.getLocalPath());
+ appTracker1.handle(new ResourceLocalizedEvent(appReq1,
+ appLr1.getLocalPath(), appLr1.getSize()));
+ assertNotNull("Localization not started", appLr3.getLocalPath());
+ appTracker2.handle(new ResourceLocalizedEvent(appReq3,
+ appLr3.getLocalPath(), appLr3.getSize() + 7));
+ assertNotNull("Localization not started", pubLr1.getLocalPath());
+ pubTracker.handle(new ResourceLocalizedEvent(pubReq1,
+ pubLr1.getLocalPath(), pubLr1.getSize() + 1000));
+ assertNotNull("Localization not started", pubLr2.getLocalPath());
+ pubTracker.handle(new ResourceLocalizedEvent(pubReq2,
+ pubLr2.getLocalPath(), pubLr2.getSize() + 99999));
+
+ dispatcher.await();
+ assertEquals(ResourceState.LOCALIZED, privLr1.getState());
+ assertEquals(ResourceState.LOCALIZED, privLr2.getState());
+ assertEquals(ResourceState.LOCALIZED, appLr1.getState());
+ assertEquals(ResourceState.DOWNLOADING, appLr2.getState());
+ assertEquals(ResourceState.LOCALIZED, appLr3.getState());
+ assertEquals(ResourceState.LOCALIZED, pubLr1.getState());
+ assertEquals(ResourceState.LOCALIZED, pubLr2.getState());
+
+ // restart and recover
+ spyService = createSpyService(dispatcher, dirsHandler, stateStore);
+ spyService.init(conf);
+ spyService.recoverLocalizedResources(
+ stateStore.loadLocalizationState());
+ dispatcher.await();
+
+ appTracker1 = spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.APPLICATION, user1, appId1);
+ privTracker1 = spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.PRIVATE, user1, null);
+ appTracker2 = spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.APPLICATION, user2, appId2);
+ pubTracker = spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.PUBLIC, null, null);
+
+ LocalizedResource recoveredRsrc =
+ privTracker1.getLocalizedResource(privReq1);
+ assertEquals(privReq1, recoveredRsrc.getRequest());
+ assertEquals(privLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+ assertEquals(privLr1.getSize(), recoveredRsrc.getSize());
+ assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+ recoveredRsrc = privTracker1.getLocalizedResource(privReq2);
+ assertEquals(privReq2, recoveredRsrc.getRequest());
+ assertEquals(privLr2.getLocalPath(), recoveredRsrc.getLocalPath());
+ assertEquals(privLr2.getSize(), recoveredRsrc.getSize());
+ assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+ recoveredRsrc = appTracker1.getLocalizedResource(appReq1);
+ assertEquals(appReq1, recoveredRsrc.getRequest());
+ assertEquals(appLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+ assertEquals(appLr1.getSize(), recoveredRsrc.getSize());
+ assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+ recoveredRsrc = appTracker2.getLocalizedResource(appReq2);
+ assertNull("in-progress resource should not be present", recoveredRsrc);
+ recoveredRsrc = appTracker2.getLocalizedResource(appReq3);
+ assertEquals(appReq3, recoveredRsrc.getRequest());
+ assertEquals(appLr3.getLocalPath(), recoveredRsrc.getLocalPath());
+ assertEquals(appLr3.getSize(), recoveredRsrc.getSize());
+ assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+ } finally {
+ dispatcher.stop();
+ stateStore.close();
+ }
+ }
+
@Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
@@ -436,7 +671,8 @@ public class TestResourceLocalizationSer
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandler);
+ dirsHandler,
+ new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -469,7 +705,7 @@ public class TestResourceLocalizationSer
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
- final Container c = getMockContainer(appId, 42);
+ final Container c = getMockContainer(appId, 42, "user0");
FSDataOutputStream out =
new FSDataOutputStream(new DataOutputBuffer(), null);
doReturn(out).when(spylfs).createInternal(isA(Path.class),
@@ -616,7 +852,8 @@ public class TestResourceLocalizationSer
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandler);
+ dirsHandler,
+ new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@@ -637,7 +874,7 @@ public class TestResourceLocalizationSer
dispatcher.await();
// init container.
- final Container c = getMockContainer(appId, 42);
+ final Container c = getMockContainer(appId, 42, user);
// init resources
Random r = new Random();
@@ -725,7 +962,7 @@ public class TestResourceLocalizationSer
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandlerSpy);
+ dirsHandlerSpy, new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@@ -758,7 +995,7 @@ public class TestResourceLocalizationSer
.put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
// init container.
- final Container c = getMockContainer(appId, 42);
+ final Container c = getMockContainer(appId, 42, user);
// first test ioexception
Mockito
@@ -838,7 +1075,7 @@ public class TestResourceLocalizationSer
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
- localDirHandler);
+ localDirHandler, new NMNullStateStoreService());
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@@ -991,7 +1228,7 @@ public class TestResourceLocalizationSer
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
- localDirHandler);
+ localDirHandler, new NMNullStateStoreService());
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@@ -1157,7 +1394,7 @@ public class TestResourceLocalizationSer
// it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService,
- dirsHandler);
+ dirsHandler, new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf);
@@ -1424,12 +1661,13 @@ public class TestResourceLocalizationSer
return getMockedResource(r, LocalResourceVisibility.PRIVATE);
}
- private static Container getMockContainer(ApplicationId appId, int id) {
+ private static Container getMockContainer(ApplicationId appId, int id,
+ String user) {
Container c = mock(Container.class);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
- when(c.getUser()).thenReturn("user0");
+ when(c.getUser()).thenReturn(user);
when(c.getContainerId()).thenReturn(cId);
Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id));
@@ -1438,6 +1676,24 @@ public class TestResourceLocalizationSer
return c;
}
+ private ResourceLocalizationService createSpyService(
+ DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
+ NMStateStoreService stateStore) {
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
+ DeletionService delService = mock(DeletionService.class);
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService,
+ dirsHandler, stateStore);
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(mockServer).when(spyService).createServer();
+ doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
+ isA(Configuration.class));
+ doReturn(lfs).when(spyService)
+ .getLocalFileContext(isA(Configuration.class));
+ return spyService;
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
static Token<? extends TokenIdentifier> getToken(int id) {
return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Sat Jun 7 16:29:10 2014
@@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.junit.Test;
+
import static org.junit.Assert.*;
import org.mockito.ArgumentCaptor;
+
import static org.mockito.Mockito.*;
public class TestResourceRetention {
@@ -81,7 +83,7 @@ public class TestResourceRetention {
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
- trackerResources, false, conf));
+ null, trackerResources, false, conf, new NMNullStateStoreService()));
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java Sat Jun 7 16:29:10 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -134,4 +135,9 @@ public class MockContainer implements Co
public ContainerTokenIdentifier getContainerTokenIdentifier() {
return this.containerTokenIdentifier;
}
+
+ @Override
+ public NMContainerStatus getNMContainerStatus() {
+ return null;
+ }
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java Sat Jun 7 16:29:10 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -77,7 +78,8 @@ public class TestContainerLogsPage {
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
- NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, new ApplicationACLsManager(conf));
+ NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+ new ApplicationACLsManager(conf), new NMNullStateStoreService());
// Add an application and the corresponding containers
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
String user = "nobody";
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Sat Jun 7 16:29:10 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -77,7 +79,8 @@ public class TestNMWebServer {
}
private int startNMWebAppServer(String webAddr) {
- Context nmContext = new NodeManager.NMContext(null, null, null, null);
+ Context nmContext = new NodeManager.NMContext(null, null, null, null,
+ null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@@ -135,7 +138,8 @@ public class TestNMWebServer {
@Test
public void testNMWebApp() throws IOException, YarnException {
- Context nmContext = new NodeManager.NMContext(null, null, null, null);
+ Context nmContext = new NodeManager.NMContext(null, null, null, null,
+ null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@@ -185,6 +189,7 @@ public class TestNMWebServer {
ContainerId container2 =
BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 1);
NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
+ NMStateStoreService stateStore = new NMNullStateStoreService();
for (ContainerId containerId : new ContainerId[] { container1,
container2}) {
// TODO: Use builder utils
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java Sat Jun 7 16:29:10 2014
@@ -107,7 +107,8 @@ public class TestNMWebServices extends J
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
- nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+ nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+ aclsManager, null);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java Sat Jun 7 16:29:10 2014
@@ -99,7 +99,8 @@ public class TestNMWebServicesApps exten
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
- nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+ nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+ aclsManager, null);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java Sat Jun 7 16:29:10 2014
@@ -122,7 +122,8 @@ public class TestNMWebServicesContainers
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
- nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) {
+ nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+ aclsManager, null) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Sat Jun 7 16:29:10 2014
@@ -401,6 +401,8 @@ public class ResourceManager extends Com
// Initialize the scheduler
scheduler = createScheduler();
+ scheduler.setRMContext(rmContext);
+ addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +431,6 @@ public class ResourceManager extends Com
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
- try {
- scheduler.reinitialize(conf, rmContext);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to initialize scheduler", ioe);
- }
-
// creating monitors that handle preemption
createPolicyMonitors();
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Sat Jun 7 16:29:10 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.service.Abstrac
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -195,7 +195,7 @@ public class ResourceTrackerService exte
*/
@SuppressWarnings("unchecked")
@VisibleForTesting
- void handleContainerStatus(ContainerStatus containerStatus) {
+ void handleNMContainerStatus(NMContainerStatus containerStatus) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
@@ -219,11 +219,14 @@ public class ResourceTrackerService exte
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
Container masterContainer = rmAppAttempt.getMasterContainer();
if (masterContainer.getId().equals(containerStatus.getContainerId())
- && containerStatus.getState() == ContainerState.COMPLETE) {
+ && containerStatus.getContainerState() == ContainerState.COMPLETE) {
+ ContainerStatus status =
+ ContainerStatus.newInstance(containerStatus.getContainerId(),
+ containerStatus.getContainerState(), containerStatus.getDiagnostics(),
+ containerStatus.getContainerExitStatus());
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
- new RMAppAttemptContainerFinishedEvent(appAttemptId,
- containerStatus);
+ new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
}
@@ -240,11 +243,11 @@ public class ResourceTrackerService exte
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
- if (!request.getContainerStatuses().isEmpty()) {
+ if (!request.getNMContainerStatuses().isEmpty()) {
LOG.info("received container statuses on node manager register :"
- + request.getContainerStatuses());
- for (ContainerStatus containerStatus : request.getContainerStatuses()) {
- handleContainerStatus(containerStatus);
+ + request.getNMContainerStatuses());
+ for (NMContainerStatus report : request.getNMContainerStatuses()) {
+ handleNMContainerStatus(report);
}
}
RegisterNodeManagerResponse response = recordFactory
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Sat Jun 7 16:29:10 2014
@@ -90,7 +90,9 @@ public class ZKRMStateStore extends RMSt
private String zkHostPort = null;
private int zkSessionTimeout;
- private long zkRetryInterval;
+
+ @VisibleForTesting
+ long zkRetryInterval;
private List<ACL> zkAcl;
private List<ZKUtil.ZKAuthInfo> zkAuths;
@@ -199,9 +201,14 @@ public class ZKRMStateStore extends RMSt
zkSessionTimeout =
conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
- zkRetryInterval =
- conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+
+ if (HAUtil.isHAEnabled(conf)) {
+ zkRetryInterval = zkSessionTimeout / numRetries;
+ } else {
+ zkRetryInterval =
+ conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+ }
zkAcl = RMZKUtils.getZKAcls(conf);
zkAuths = RMZKUtils.getZKAuths(conf);
@@ -539,7 +546,7 @@ public class ZKRMStateStore extends RMSt
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
- LOG.info("Done Loading applications from ZK state store");
+ LOG.debug("Done Loading applications from ZK state store");
}
@Override
@@ -572,7 +579,7 @@ public class ZKRMStateStore extends RMSt
} else {
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
CreateMode.PERSISTENT);
- LOG.info(appId + " znode didn't exist. Created a new znode to"
+ LOG.debug(appId + " znode didn't exist. Created a new znode to"
+ " update the application state.");
}
}
@@ -615,7 +622,7 @@ public class ZKRMStateStore extends RMSt
} else {
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
- LOG.info(appAttemptId + " znode didn't exist. Created a new znode to"
+ LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
}
@@ -664,7 +671,7 @@ public class ZKRMStateStore extends RMSt
if (existsWithRetries(nodeRemovePath, true) != null) {
opList.add(Op.delete(nodeRemovePath, -1));
} else {
- LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
doMultiWithRetries(opList);
}
@@ -681,7 +688,7 @@ public class ZKRMStateStore extends RMSt
// in case znode doesn't exist
addStoreOrUpdateOps(
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
- LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
// in case znode exists
addStoreOrUpdateOps(
@@ -763,7 +770,7 @@ public class ZKRMStateStore extends RMSt
if (existsWithRetries(nodeRemovePath, true) != null) {
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
- LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
@@ -816,7 +823,7 @@ public class ZKRMStateStore extends RMSt
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
- LOG.info("Session expired");
+ LOG.info("ZKRMStateStore Session expired");
createConnection();
break;
default:
@@ -991,13 +998,13 @@ public class ZKRMStateStore extends RMSt
throw new StoreFencedException();
}
} catch (KeeperException ke) {
+ LOG.info("Exception while executing a ZK operation.", ke);
if (shouldRetry(ke.code()) && ++retry < numRetries) {
- LOG.info("Waiting for zookeeper to be connected, retry no. + "
- + retry);
+ LOG.info("Retrying operation on ZK. Retry no. " + retry);
Thread.sleep(zkRetryInterval);
continue;
}
- LOG.debug("Error while doing ZK operation.", ke);
+ LOG.info("Maxed out ZK retries. Giving up!");
throw ke;
}
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Sat Jun 7 16:29:10 2014
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.util.resou
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
- implements ResourceScheduler {
+ extends AbstractService implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
@@ -62,6 +63,15 @@ public abstract class AbstractYarnSchedu
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
+ /**
+ * Construct the service.
+ *
+ * @param name service name
+ */
+ public AbstractYarnScheduler(String name) {
+ super(name);
+ }
+
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Sat Jun 7 16:29:10 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+ /**
+ * Set RMContext for <code>ResourceScheduler</code>.
+ * This method should be called immediately after instantiating
+ * a scheduler once.
+ * @param rmContext created by ResourceManager
+ */
+ void setRMContext(RMContext rmContext);
+
/**
* Re-initialize the <code>ResourceScheduler</code>.
* @param conf configuration
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Sat Jun 7 16:29:10 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -76,6 +77,9 @@ public class SchedulerApplicationAttempt
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
+ private Resource amResource;
+ private boolean unmanagedAM = true;
+ private boolean amRunning = false;
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@@ -106,6 +110,17 @@ public class SchedulerApplicationAttempt
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager);
this.queue = queue;
+
+ if (rmContext != null && rmContext.getRMApps() != null &&
+ rmContext.getRMApps()
+ .containsKey(applicationAttemptId.getApplicationId())) {
+ ApplicationSubmissionContext appSubmissionContext =
+ rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+ .getApplicationSubmissionContext();
+ if (appSubmissionContext != null) {
+ unmanagedAM = appSubmissionContext.getUnmanagedAM();
+ }
+ }
}
/**
@@ -168,6 +183,26 @@ public class SchedulerApplicationAttempt
return appSchedulingInfo.getQueueName();
}
+ public Resource getAMResource() {
+ return amResource;
+ }
+
+ public void setAMResource(Resource amResource) {
+ this.amResource = amResource;
+ }
+
+ public boolean isAmRunning() {
+ return amRunning;
+ }
+
+ public void setAmRunning(boolean bool) {
+ amRunning = bool;
+ }
+
+ public boolean getUnmanagedAM() {
+ return unmanagedAM;
+ }
+
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sat Jun 7 16:29:10 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -102,6 +103,8 @@ public class CapacityScheduler extends
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
private CSQueue root;
+ // timeout to join when we stop this service
+ protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
@@ -179,8 +182,6 @@ public class CapacityScheduler extends
private int numNodeManagers = 0;
- private boolean initialized = false;
-
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@@ -196,7 +197,9 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
- public CapacityScheduler() {}
+ public CapacityScheduler() {
+ super(CapacityScheduler.class.getName());
+ }
@Override
public QueueMetrics getRootQueueMetrics() {
@@ -238,56 +241,91 @@ public class CapacityScheduler extends
}
@Override
- public RMContext getRMContext() {
+ public synchronized RMContext getRMContext() {
return this.rmContext;
}
-
+
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ private synchronized void initScheduler(Configuration configuration) throws
+ IOException {
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ this.minimumAllocation = this.conf.getMinimumAllocation();
+ this.maximumAllocation = this.conf.getMaximumAllocation();
+ this.calculator = this.conf.getResourceCalculator();
+ this.usePortForNodeName = this.conf.getUsePortForNodeName();
+ this.applications =
+ new ConcurrentHashMap<ApplicationId,
+ SchedulerApplication<FiCaSchedulerApp>>();
+
+ initializeQueues(this.conf);
+
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ asyncScheduleInterval =
+ this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+ if (scheduleAsynchronously) {
+ asyncSchedulerThread = new AsyncScheduleThread(this);
+ }
+
+ LOG.info("Initialized CapacityScheduler with " +
+ "calculator=" + getResourceCalculator().getClass() + ", " +
+ "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+ "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+ "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+ "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+ }
+
+ private synchronized void startSchedulerThreads() {
+ if (scheduleAsynchronously) {
+ Preconditions.checkNotNull(asyncSchedulerThread,
+ "asyncSchedulerThread is null");
+ asyncSchedulerThread.start();
+ }
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
- if (!initialized) {
- this.rmContext = rmContext;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- this.minimumAllocation = this.conf.getMinimumAllocation();
- this.maximumAllocation = this.conf.getMaximumAllocation();
- this.calculator = this.conf.getResourceCalculator();
- this.usePortForNodeName = this.conf.getUsePortForNodeName();
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+ initScheduler(configuration);
+ super.serviceInit(conf);
+ }
- initializeQueues(this.conf);
-
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
- asyncScheduleInterval =
- this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
- if (scheduleAsynchronously) {
- asyncSchedulerThread = new AsyncScheduleThread(this);
- asyncSchedulerThread.start();
- }
-
- initialized = true;
- LOG.info("Initialized CapacityScheduler with " +
- "calculator=" + getResourceCalculator().getClass() + ", " +
- "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
- "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
- "asynchronousScheduling=" + scheduleAsynchronously + ", " +
- "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-
- } else {
- CapacitySchedulerConfiguration oldConf = this.conf;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- try {
- LOG.info("Re-initializing queues...");
- reinitializeQueues(this.conf);
- } catch (Throwable t) {
- this.conf = oldConf;
- throw new IOException("Failed to re-init queues", t);
+ @Override
+ public void serviceStart() throws Exception {
+ startSchedulerThreads();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized (this) {
+ if (scheduleAsynchronously && asyncSchedulerThread != null) {
+ asyncSchedulerThread.interrupt();
+ asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized void
+ reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ Configuration configuration = new Configuration(conf);
+ CapacitySchedulerConfiguration oldConf = this.conf;
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ try {
+ LOG.info("Re-initializing queues...");
+ reinitializeQueues(this.conf);
+ } catch (Throwable t) {
+ this.conf = oldConf;
+ throw new IOException("Failed to re-init queues", t);
+ }
}
long getAsyncScheduleInterval() {
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Sat Jun 7 16:29:10 2014
@@ -53,6 +53,10 @@ public class AllocationConfiguration {
private final int userMaxAppsDefault;
private final int queueMaxAppsDefault;
+ // Maximum resource share for each leaf queue that can be used to run AMs
+ final Map<String, Float> queueMaxAMShares;
+ private final float queueMaxAMShareDefault;
+
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
@@ -84,8 +88,9 @@ public class AllocationConfiguration {
public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
- Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
- int queueMaxAppsDefault,
+ Map<String, ResourceWeights> queueWeights,
+ Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+ int queueMaxAppsDefault, float queueMaxAMShareDefault,
Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
@@ -97,9 +102,11 @@ public class AllocationConfiguration {
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
+ this.queueMaxAMShares = queueMaxAMShares;
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
+ this.queueMaxAMShareDefault = queueMaxAMShareDefault;
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@@ -116,8 +123,10 @@ public class AllocationConfiguration {
queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
+ queueMaxAMShares = new HashMap<String, Float>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
+ queueMaxAMShareDefault = 1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@@ -184,6 +193,11 @@ public class AllocationConfiguration {
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
}
+ public float getQueueMaxAMShare(String queue) {
+ Float maxAMShare = queueMaxAMShares.get(queue);
+ return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
+ }
+
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Sat Jun 7 16:29:10 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-
+
+ public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -96,58 +98,69 @@ public class AllocationFileLoaderService
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
this.allocFile = getAllocationFile(conf);
- super.init(conf);
- }
-
- @Override
- public void start() {
- if (allocFile == null) {
- return;
- }
- reloadThread = new Thread() {
- public void run() {
- while (running) {
- long time = clock.getTime();
- long lastModified = allocFile.lastModified();
- if (lastModified > lastSuccessfulReload &&
- time > lastModified + ALLOC_RELOAD_WAIT_MS) {
- try {
- reloadAllocations();
- } catch (Exception ex) {
+ if (allocFile != null) {
+ reloadThread = new Thread() {
+ @Override
+ public void run() {
+ while (running) {
+ long time = clock.getTime();
+ long lastModified = allocFile.lastModified();
+ if (lastModified > lastSuccessfulReload &&
+ time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+ try {
+ reloadAllocations();
+ } catch (Exception ex) {
+ if (!lastReloadAttemptFailed) {
+ LOG.error("Failed to reload fair scheduler config file - " +
+ "will use existing allocations.", ex);
+ }
+ lastReloadAttemptFailed = true;
+ }
+ } else if (lastModified == 0l) {
if (!lastReloadAttemptFailed) {
- LOG.error("Failed to reload fair scheduler config file - " +
- "will use existing allocations.", ex);
+ LOG.warn("Failed to reload fair scheduler config file because" +
+ " last modified returned 0. File exists: "
+ + allocFile.exists());
}
lastReloadAttemptFailed = true;
}
- } else if (lastModified == 0l) {
- if (!lastReloadAttemptFailed) {
- LOG.warn("Failed to reload fair scheduler config file because" +
- " last modified returned 0. File exists: " + allocFile.exists());
+ try {
+ Thread.sleep(reloadIntervalMs);
+ } catch (InterruptedException ex) {
+ LOG.info(
+ "Interrupted while waiting to reload alloc configuration");
}
- lastReloadAttemptFailed = true;
- }
- try {
- Thread.sleep(reloadIntervalMs);
- } catch (InterruptedException ex) {
- LOG.info("Interrupted while waiting to reload alloc configuration");
}
}
- }
- };
- reloadThread.setName("AllocationFileReloader");
- reloadThread.setDaemon(true);
- reloadThread.start();
- super.start();
+ };
+ reloadThread.setName("AllocationFileReloader");
+ reloadThread.setDaemon(true);
+ }
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ public void serviceStart() throws Exception {
+ if (reloadThread != null) {
+ reloadThread.start();
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
running = false;
- reloadThread.interrupt();
- super.stop();
+ if (reloadThread != null) {
+ reloadThread.interrupt();
+ try {
+ reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("reloadThread fails to join.");
+ }
+ }
+ super.serviceStop();
}
/**
@@ -200,6 +213,7 @@ public class AllocationFileLoaderService
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+ Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -207,6 +221,7 @@ public class AllocationFileLoaderService
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
+ float queueMaxAMShareDefault = 1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@@ -273,6 +288,11 @@ public class AllocationFileLoaderService
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
+ } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShareDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
@@ -297,8 +317,8 @@ public class AllocationFileLoaderService
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, queuePolicies,
- minSharePreemptionTimeouts, queueAcls,
+ queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+ queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
}
@@ -313,8 +333,8 @@ public class AllocationFileLoaderService
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
- queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+ queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
+ queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
newPlacementPolicy, configuredQueues);
@@ -329,7 +349,8 @@ public class AllocationFileLoaderService
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
- Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+ Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
+ Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@@ -361,6 +382,11 @@ public class AllocationFileLoaderService
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
+ } else if ("maxAMShare".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShares.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
@@ -383,8 +409,9 @@ public class AllocationFileLoaderService
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, queuePolicies,
- minSharePreemptionTimeouts, queueAcls, configuredQueues);
+ queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+ queuePolicies, minSharePreemptionTimeouts, queueAcls,
+ configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Sat Jun 7 16:29:10 2014
@@ -267,6 +267,13 @@ public class AppSchedulable extends Sche
node.allocateContainer(app.getApplicationId(),
allocatedContainer);
+ // If this container is used to run AM, update the leaf queue's AM usage
+ if (app.getLiveContainers().size() == 1 &&
+ !app.getUnmanagedAM()) {
+ queue.addAMResourceUsage(container.getResource());
+ app.setAmRunning(true);
+ }
+
return container.getResource();
} else {
// The desired container won't fit here, so reserve
@@ -297,6 +304,14 @@ public class AppSchedulable extends Sche
app.addSchedulingOpportunity(priority);
+ // Check the AM resource usage for the leaf queue
+ if (app.getLiveContainers().size() == 0
+ && !app.getUnmanagedAM()) {
+ if (!queue.canRunAppAM(app.getAMResource())) {
+ return Resources.none();
+ }
+ }
+
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Sat Jun 7 16:29:10 2014
@@ -55,6 +55,9 @@ public class FSLeafQueue extends FSQueue
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
+ // Track the AM resource usage for this queue
+ private Resource amResourceUsage;
+
private final ActiveUsersManager activeUsersManager;
public FSLeafQueue(String name, FairScheduler scheduler,
@@ -63,6 +66,7 @@ public class FSLeafQueue extends FSQueue
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
+ amResourceUsage = Resource.newInstance(0, 0);
}
public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -86,6 +90,10 @@ public class FSLeafQueue extends FSQueue
*/
public boolean removeApp(FSSchedulerApp app) {
if (runnableAppScheds.remove(app.getAppSchedulable())) {
+ // Update AM resource usage
+ if (app.isAmRunning() && app.getAMResource() != null) {
+ Resources.subtractFrom(amResourceUsage, app.getAMResource());
+ }
return true;
} else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
return false;
@@ -145,6 +153,10 @@ public class FSLeafQueue extends FSQueue
return usage;
}
+ public Resource getAmResourceUsage() {
+ return amResourceUsage;
+ }
+
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue
@@ -284,4 +296,26 @@ public class FSLeafQueue extends FSQueue
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ /**
+ * Check whether this queue can run this application master under the
+ * maxAMShare limit
+ *
+ * @param amResource
+ * @return true if this queue can run
+ */
+ public boolean canRunAppAM(Resource amResource) {
+ float maxAMShare =
+ scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
+ Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
+ Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
+ return !policy
+ .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
+ }
+
+ public void addAMResourceUsage(Resource amResource) {
+ if (amResource != null) {
+ Resources.addTo(amResourceUsage, amResource);
+ }
+ }
}