You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2013/04/10 22:17:54 UTC
svn commit: r1466658 [2/2] - in
/hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java...
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Wed Apr 10 20:17:39 2013
@@ -50,7 +50,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
@@ -66,9 +65,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@@ -95,12 +96,33 @@ public class TestContainerLocalizer {
public void testContainerLocalizerMain() throws Exception {
ContainerLocalizer localizer = setupContainerLocalizerForTest();
+ // verify created cache
+ List<Path> privCacheList = new ArrayList<Path>();
+ List<Path> appCacheList = new ArrayList<Path>();
+ for (Path p : localDirs) {
+ Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
+ Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
+ privCacheList.add(privcache);
+ Path appDir =
+ new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
+ Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
+ appCacheList.add(appcache);
+ }
+
// mock heartbeat responses from NM
- LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
- LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
- LocalResource rsrcC = getMockRsrc(random,
- LocalResourceVisibility.APPLICATION);
- LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+ ResourceLocalizationSpec rsrcA =
+ getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+ privCacheList.get(0));
+ ResourceLocalizationSpec rsrcB =
+ getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+ privCacheList.get(0));
+ ResourceLocalizationSpec rsrcC =
+ getMockRsrc(random, LocalResourceVisibility.APPLICATION,
+ appCacheList.get(0));
+ ResourceLocalizationSpec rsrcD =
+ getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+ privCacheList.get(0));
+
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcA)))
@@ -111,27 +133,33 @@ public class TestContainerLocalizer {
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcD)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
- Collections.<LocalResource>emptyList()))
+ Collections.<ResourceLocalizationSpec>emptyList()))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
null));
- doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
+ LocalResource tRsrcA = rsrcA.getResource();
+ LocalResource tRsrcB = rsrcB.getResource();
+ LocalResource tRsrcC = rsrcC.getResource();
+ LocalResource tRsrcD = rsrcD.getResource();
+ doReturn(
+ new FakeDownload(rsrcA.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcA),
isA(UserGroupInformation.class));
- doReturn(new FakeDownload(rsrcB.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcB),
+ doReturn(
+ new FakeDownload(rsrcB.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcB),
isA(UserGroupInformation.class));
- doReturn(new FakeDownload(rsrcC.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcC),
+ doReturn(
+ new FakeDownload(rsrcC.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcC),
isA(UserGroupInformation.class));
- doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
- localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
+ doReturn(
+ new FakeDownload(rsrcD.getResource().getResource().getFile(), true))
+ .when(localizer).download(isA(Path.class), eq(tRsrcD),
isA(UserGroupInformation.class));
// run localization
assertEquals(0, localizer.runLocalization(nmAddr));
-
- // verify created cache
for (Path p : localDirs) {
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
@@ -143,15 +171,14 @@ public class TestContainerLocalizer {
Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
}
-
// verify tokens read at expected location
verify(spylfs).open(tokenPath);
// verify downloaded resources reported to NM
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
- verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA.getResource())));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB.getResource())));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC.getResource())));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD.getResource())));
// verify all HB use localizerID provided
verify(nmProxy, never()).heartbeat(argThat(
@@ -306,10 +333,12 @@ public class TestContainerLocalizer {
return mockRF;
}
- static LocalResource getMockRsrc(Random r,
- LocalResourceVisibility vis) {
- LocalResource rsrc = mock(LocalResource.class);
+ static ResourceLocalizationSpec getMockRsrc(Random r,
+ LocalResourceVisibility vis, Path p) {
+ ResourceLocalizationSpec resourceLocalizationSpec =
+ mock(ResourceLocalizationSpec.class);
+ LocalResource rsrc = mock(LocalResource.class);
String name = Long.toHexString(r.nextLong());
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
when(uri.getScheme()).thenReturn("file");
@@ -322,7 +351,10 @@ public class TestContainerLocalizer {
when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
when(rsrc.getVisibility()).thenReturn(vis);
- return rsrc;
+ when(resourceLocalizationSpec.getResource()).thenReturn(rsrc);
+ when(resourceLocalizationSpec.getDestinationDirectory()).
+ thenReturn(ConverterUtils.getYarnUrlFromPath(p));
+ return resourceLocalizationSpec;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Wed Apr 10 20:17:39 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -375,7 +377,7 @@ public class TestResourceLocalizationSer
}
}
- @Test
+ @Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new YarnConfiguration();
@@ -386,12 +388,17 @@ public class TestResourceLocalizationSer
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
- String[] sDirs = new String[4];
- for (int i = 0; i < 4; ++i) {
- localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
- sDirs[i] = localDirs.get(i).toString();
- }
+ String[] sDirs = new String[1];
+ // Making sure that we have only one local disk so that it will only be
+ // selected for consecutive resource localization calls. This is required
+ // to test LocalCacheDirectoryManager.
+ localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+ sDirs[0] = localDirs.get(0).toString();
+
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ // Adding configuration to make sure there is only one file per
+ // directory
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
@@ -452,12 +459,23 @@ public class TestResourceLocalizationSer
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
- final LocalResource resource = getPrivateMockedResource(r);
- final LocalResourceRequest req = new LocalResourceRequest(resource);
+ final LocalResource resource1 = getPrivateMockedResource(r);
+ LocalResource resource2 = null;
+ do {
+ resource2 = getPrivateMockedResource(r);
+ } while (resource2 == null || resource2.equals(resource1));
+ // above call to make sure we don't get identical resources.
+
+ final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+ final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
- rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
+ List<LocalResourceRequest> privateResourceList =
+ new ArrayList<LocalResourceRequest>();
+ privateResourceList.add(req1);
+ privateResourceList.add(req2);
+ rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
Thread.sleep(1000);
@@ -471,33 +489,67 @@ public class TestResourceLocalizationSer
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
- LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
LocalizerStatus stat = mock(LocalizerStatus.class);
when(stat.getLocalizerId()).thenReturn(ctnrStr);
- when(rsrcStat.getResource()).thenReturn(resource);
- when(rsrcStat.getLocalSize()).thenReturn(4344L);
+ when(rsrcStat1.getResource()).thenReturn(resource1);
+ when(rsrcStat2.getResource()).thenReturn(resource2);
+ when(rsrcStat1.getLocalSize()).thenReturn(4344L);
+ when(rsrcStat2.getLocalSize()).thenReturn(2342L);
URL locPath = getPath("/cache/private/blah");
- when(rsrcStat.getLocalPath()).thenReturn(locPath);
- when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(rsrcStat1.getLocalPath()).thenReturn(locPath);
+ when(rsrcStat2.getLocalPath()).thenReturn(locPath);
+ when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(stat.getResources())
.thenReturn(Collections.<LocalResourceStatus>emptyList())
- .thenReturn(Collections.singletonList(rsrcStat))
+ .thenReturn(Collections.singletonList(rsrcStat1))
+ .thenReturn(Collections.singletonList(rsrcStat2))
.thenReturn(Collections.<LocalResourceStatus>emptyList());
- // get rsrc
+ String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
+ Path.SEPARATOR + "user0" + Path.SEPARATOR +
+ ContainerLocalizer.FILECACHE;
+
+ // get first resource
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
- assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
+ assertEquals(1, response.getResourceSpecs().size());
+ assertEquals(req1,
+ new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
+ URL localizedPath =
+ response.getResourceSpecs().get(0).getDestinationDirectory();
+ // Appending to local path unique number(10) generated as a part of
+ // LocalResourcesTracker
+ assertTrue(localizedPath.getFile().endsWith(
+ localPath + Path.SEPARATOR + "10"));
+
+ // get second resource
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+ assertEquals(1, response.getResourceSpecs().size());
+ assertEquals(req2, new LocalResourceRequest(response.getResourceSpecs()
+ .get(0).getResource()));
+ localizedPath =
+ response.getResourceSpecs().get(0).getDestinationDirectory();
+ // Resource's destination path should be now inside sub directory 0 as
+ // LocalCacheDirectoryManager will be used and we have restricted number
+ // of files per directory to 1.
+ assertTrue(localizedPath.getFile().endsWith(
+ localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
// empty rsrc
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
- assertEquals(0, response.getAllResources().size());
+ assertEquals(0, response.getResourceSpecs().size());
// get shutdown
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+ dispatcher.await();
// verify container notification
ArgumentMatcher<ContainerEvent> matchesContainerLoc =
new ArgumentMatcher<ContainerEvent>() {
@@ -508,9 +560,9 @@ public class TestResourceLocalizationSer
&& c.getContainerID() == evt.getContainerID();
}
};
- dispatcher.await();
- verify(containerBus).handle(argThat(matchesContainerLoc));
-
+ // total 2 resource localzation calls. one for each resource.
+ verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
+
// Verify deletion of localization token.
verify(delService).delete((String)isNull(), eq(localizationTokenPath));
} finally {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Apr 10 20:17:39 2013
@@ -54,15 +54,17 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -276,6 +278,14 @@ public class ApplicationMasterService ex
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
+ // sanity check
+ try {
+ SchedulerUtils.validateResourceRequests(ask,
+ rScheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("Invalid resource ask by application " + appAttemptId, e);
+ throw RPCUtil.getRemoteException(e);
+ }
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Apr 10 20:17:39 2013
@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -86,8 +87,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -273,6 +277,21 @@ public class ClientRMService extends Abs
// Safety
submissionContext.setUser(user);
+ // Check whether AM resource requirements are within required limits
+ if (!submissionContext.getUnmanagedAM()) {
+ ResourceRequest amReq = BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getAMContainerSpec().getResource(), 1);
+ try {
+ SchedulerUtils.validateResourceRequest(amReq,
+ scheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("RM app submission failed in validating AM resource request"
+ + " for application " + applicationId, e);
+ throw RPCUtil.getRemoteException(e);
+ }
+ }
+
// This needs to be synchronous as the client can query
// immediately following the submission to get the application status.
// So call handle directly and do not send an event.
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Apr 10 20:17:39 2013
@@ -57,6 +57,7 @@ public class RMAppManager implements Eve
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
+ private int globalMaxAppAttempts;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
@@ -76,6 +77,8 @@ public class RMAppManager implements Eve
setCompletedAppsMax(conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
+ globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
}
/**
@@ -308,6 +311,7 @@ public class RMAppManager implements Eve
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) {
+ boolean shouldRecover = true;
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
@@ -318,16 +322,39 @@ public class RMAppManager implements Eve
// This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them
LOG.info("Not recovering unmanaged application " + appState.getAppId());
- store.removeApplication(appState);
+ shouldRecover = false;
+ }
+ int individualMaxAppAttempts = appState.getApplicationSubmissionContext()
+ .getMaxAppAttempts();
+ int maxAppAttempts;
+ if (individualMaxAppAttempts <= 0 ||
+ individualMaxAppAttempts > globalMaxAppAttempts) {
+ maxAppAttempts = globalMaxAppAttempts;
+ LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ + " for application: " + appState.getAppId()
+ + " is invalid, because it is out of the range [1, "
+ + globalMaxAppAttempts + "]. Use the global max attempts instead.");
} else {
+ maxAppAttempts = individualMaxAppAttempts;
+ }
+ if(appState.getAttemptCount() >= maxAppAttempts) {
+ LOG.info("Not recovering application " + appState.getAppId() +
+ " due to recovering attempt is beyond maxAppAttempt limit");
+ shouldRecover = false;
+ }
+
+ if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),
- appState.getSubmitTime());
+ appState.getSubmitTime());
// re-populate attempt information in application
RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
- appState.getAppId());
+ appState.getAppId());
appImpl.recover(state);
}
+ else {
+ store.removeApplication(appState);
+ }
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Apr 10 20:17:39 2013
@@ -327,12 +327,52 @@ public class ResourceManager extends Com
this.applicationACLsManager, this.conf);
}
+ // sanity check for configurations
protected static void validateConfigs(Configuration conf) {
- int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ // validate max-attempts
+ int globalMaxAppAttempts =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
if (globalMaxAppAttempts <= 0) {
- throw new YarnException(
- "The global max attempts should be a positive integer.");
+ throw new YarnException("Invalid global max attempts configuration"
+ + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
+ + "=" + globalMaxAppAttempts + ", it should be a positive integer.");
+ }
+
+ // validate scheduler memory allocation setting
+ int minMem = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ int maxMem = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+
+ if (minMem <= 0 || minMem > maxMem) {
+ throw new YarnException("Invalid resource scheduler memory"
+ + " allocation configuration"
+ + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ + "=" + minMem
+ + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ + "=" + maxMem + ", min and max should be greater than 0"
+ + ", max should be no smaller than min.");
+ }
+
+ // validate scheduler vcores allocation setting
+ int minVcores = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ int maxVcores = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+ if (minVcores <= 0 || minVcores > maxVcores) {
+ throw new YarnException("Invalid resource scheduler vcores"
+ + " allocation configuration"
+ + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ + "=" + minVcores
+ + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ + "=" + maxVcores + ", min and max should be greater than 0"
+ + ", max should be no smaller than min.");
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java Wed Apr 10 20:17:39 2013
@@ -53,12 +53,14 @@ public class DefaultResourceCalculator e
}
@Override
- public Resource normalize(Resource r, Resource minimumResource) {
- return Resources.createResource(
+ public Resource normalize(Resource r, Resource minimumResource,
+ Resource maximumResource) {
+ int normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
- minimumResource.getMemory())
- );
+ minimumResource.getMemory()),
+ maximumResource.getMemory());
+ return Resources.createResource(normalizedMemory);
}
@Override
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java Wed Apr 10 20:17:39 2013
@@ -123,15 +123,20 @@ public class DominantResourceCalculator
}
@Override
- public Resource normalize(Resource r, Resource minimumResource) {
- return Resources.createResource(
+ public Resource normalize(Resource r, Resource minimumResource,
+ Resource maximumResource) {
+ int normalizedMemory = Math.min(
roundUp(
- Math.max(r.getMemory(), minimumResource.getMemory()),
+ Math.max(r.getMemory(), minimumResource.getMemory()),
minimumResource.getMemory()),
+ maximumResource.getMemory());
+ int normalizedCores = Math.min(
roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
- minimumResource.getVirtualCores())
- );
+ minimumResource.getVirtualCores()),
+ maximumResource.getVirtualCores());
+ return Resources.createResource(normalizedMemory,
+ normalizedCores);
}
@Override
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java Wed Apr 10 20:17:39 2013
@@ -88,13 +88,16 @@ public abstract class ResourceCalculator
/**
* Normalize resource <code>r</code> given the base
- * <code>minimumResource</code>.
+ * <code>minimumResource</code> and verify against max allowed
+ * <code>maximumResource</code>
*
* @param r resource
* @param minimumResource step-factor
+ * @param maximumResource the upper bound of the resource to be allocated
* @return normalized resource
*/
- public abstract Resource normalize(Resource r, Resource minimumResource);
+ public abstract Resource normalize(Resource r, Resource minimumResource,
+ Resource maximumResource);
/**
* Round-up resource <code>r</code> given factor <code>stepFactor</code>.
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Wed Apr 10 20:17:39 2013
@@ -132,8 +132,9 @@ public class Resources {
}
public static Resource normalize(
- ResourceCalculator calculator, Resource lhs, Resource factor) {
- return calculator.normalize(lhs, factor);
+ ResourceCalculator calculator, Resource lhs, Resource factor,
+ Resource limit) {
+ return calculator.normalize(lhs, factor, limit);
}
public static Resource roundUp(
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Apr 10 20:17:39 2013
@@ -740,6 +740,8 @@ public class RMAppAttemptImpl implements
AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt.submissionContext
.getAMContainerSpec().getResource(), 1);
+ // SchedulerUtils.validateResourceRequests is not necessary because
+ // AM resource has been checked when submission
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Wed Apr 10 20:17:39 2013
@@ -89,10 +89,12 @@ public class SchedulerUtils {
List<ResourceRequest> asks,
ResourceCalculator resourceCalculator,
Resource clusterResource,
- Resource minimumResource) {
+ Resource minimumResource,
+ Resource maximumResource) {
for (ResourceRequest ask : asks) {
normalizeRequest(
- ask, resourceCalculator, clusterResource, minimumResource);
+ ask, resourceCalculator, clusterResource, minimumResource,
+ maximumResource);
}
}
@@ -104,11 +106,50 @@ public class SchedulerUtils {
ResourceRequest ask,
ResourceCalculator resourceCalculator,
Resource clusterResource,
- Resource minimumResource) {
+ Resource minimumResource,
+ Resource maximumResource) {
Resource normalized =
Resources.normalize(
- resourceCalculator, ask.getCapability(), minimumResource);
+ resourceCalculator, ask.getCapability(), minimumResource,
+ maximumResource);
ask.setCapability(normalized);
}
+ /**
+ * Utility method to validate a resource request, by insuring that the
+ * requested memory/vcore is non-negative and not greater than max
+ */
+ public static void validateResourceRequest(ResourceRequest resReq,
+ Resource maximumResource) throws InvalidResourceRequestException {
+ if (resReq.getCapability().getMemory() < 0 ||
+ resReq.getCapability().getMemory() > maximumResource.getMemory()) {
+ throw new InvalidResourceRequestException("Invalid resource request"
+ + ", requested memory < 0"
+ + ", or requested memory > max configured"
+ + ", requestedMemory=" + resReq.getCapability().getMemory()
+ + ", maxMemory=" + maximumResource.getMemory());
+ }
+ if (resReq.getCapability().getVirtualCores() < 0 ||
+ resReq.getCapability().getVirtualCores() >
+ maximumResource.getVirtualCores()) {
+ throw new InvalidResourceRequestException("Invalid resource request"
+ + ", requested virtual cores < 0"
+ + ", or requested virtual cores > max configured"
+ + ", requestedVirtualCores="
+ + resReq.getCapability().getVirtualCores()
+ + ", maxVirtualCores=" + maximumResource.getVirtualCores());
+ }
+ }
+
+ /**
+ * Utility method to validate a list resource requests, by insuring that the
+ * requested memory/vcore is non-negative and not greater than max
+ */
+ public static void validateResourceRequests(List<ResourceRequest> ask,
+ Resource maximumResource) throws InvalidResourceRequestException {
+ for (ResourceRequest resReq : ask) {
+ validateResourceRequest(resReq, maximumResource);
+ }
+ }
+
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Apr 10 20:17:39 2013
@@ -483,7 +483,8 @@ implements ResourceScheduler, CapacitySc
// Sanity check
SchedulerUtils.normalizeRequests(
- ask, calculator, getClusterResources(), minimumAllocation);
+ ask, calculator, getClusterResources(), minimumAllocation,
+ maximumAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Wed Apr 10 20:17:39 2013
@@ -315,8 +315,8 @@ public class CapacitySchedulerConfigurat
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
int maximumCores = getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES);
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
return Resources.createResource(maximumMemory, maximumCores);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Apr 10 20:17:39 2013
@@ -89,6 +89,8 @@ public class LeafQueue implements CSQueu
private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
private int maxActiveApplicationsPerUser;
+ private int nodeLocalityDelay;
+
private Resource usedResources = Resources.createResource(0, 0);
private float usedCapacity = 0.0f;
private volatile int numContainers;
@@ -123,8 +125,6 @@ public class LeafQueue implements CSQueu
private final ActiveUsersManager activeUsersManager;
- private final int nodeLocalityDelay;
-
private final ResourceCalculator resourceCalculator;
public LeafQueue(CapacitySchedulerContext cs,
@@ -196,9 +196,6 @@ public class LeafQueue implements CSQueu
Map<QueueACL, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
- this.nodeLocalityDelay =
- cs.getConfiguration().getNodeLocalityDelay();
-
setupQueueConfigs(
cs.getClusterResources(),
capacity, absoluteCapacity,
@@ -206,7 +203,7 @@ public class LeafQueue implements CSQueu
userLimit, userLimitFactor,
maxApplications, maxApplicationsPerUser,
maxActiveApplications, maxActiveApplicationsPerUser,
- state, acls);
+ state, acls, cs.getConfiguration().getNodeLocalityDelay());
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
@@ -227,7 +224,8 @@ public class LeafQueue implements CSQueu
int userLimit, float userLimitFactor,
int maxApplications, int maxApplicationsPerUser,
int maxActiveApplications, int maxActiveApplicationsPerUser,
- QueueState state, Map<QueueACL, AccessControlList> acls)
+ QueueState state, Map<QueueACL, AccessControlList> acls,
+ int nodeLocalityDelay)
{
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@@ -256,6 +254,8 @@ public class LeafQueue implements CSQueu
this.queueInfo.setCapacity(this.capacity);
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
this.queueInfo.setQueueState(this.state);
+
+ this.nodeLocalityDelay = nodeLocalityDelay;
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
@@ -319,7 +319,8 @@ public class LeafQueue implements CSQueu
"state = " + state +
" [= configuredState ]" + "\n" +
"acls = " + aclsString +
- " [= configuredAcls ]" + "\n");
+ " [= configuredAcls ]" + "\n" +
+ "nodeLocalityDelay = " + nodeLocalityDelay + "\n");
}
@Override
@@ -605,7 +606,8 @@ public class LeafQueue implements CSQueu
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
- newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
+ newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
+ newlyParsedLeafQueue.getNodeLocalityDelay());
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Apr 10 20:17:39 2013
@@ -650,11 +650,12 @@ public class FairScheduler implements Re
*
* @param asks a list of resource requests
* @param minMemory the configured minimum memory allocation
+ * @param maxMemory the configured maximum memory allocation
*/
static void normalizeRequests(List<ResourceRequest> asks,
- int minMemory) {
+ int minMemory, int maxMemory) {
for (ResourceRequest ask : asks) {
- normalizeRequest(ask, minMemory);
+ normalizeRequest(ask, minMemory, maxMemory);
}
}
@@ -664,11 +665,14 @@ public class FairScheduler implements Re
*
* @param ask the resource request
* @param minMemory the configured minimum memory allocation
+ * @param maxMemory the configured maximum memory allocation
*/
- static void normalizeRequest(ResourceRequest ask, int minMemory) {
+ static void normalizeRequest(ResourceRequest ask, int minMemory,
+ int maxMemory) {
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
- ask.getCapability().setMemory(
- minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0)));
+ int normalizedMemory =
+ minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
+ ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
}
@Override
@@ -684,7 +688,8 @@ public class FairScheduler implements Re
}
// Sanity check
- normalizeRequests(ask, minimumAllocation.getMemory());
+ normalizeRequests(ask, minimumAllocation.getMemory(),
+ maximumAllocation.getMemory());
// Release containers
for (ContainerId releasedContainerId : release) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Apr 10 20:17:39 2013
@@ -232,7 +232,7 @@ public class FifoScheduler implements Re
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
- clusterResource, minimumAllocation);
+ clusterResource, minimumAllocation, maximumAllocation);
// Release containers
for (ContainerId releasedContainer : release) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Apr 10 20:17:39 2013
@@ -128,21 +128,28 @@ public class MockRM extends ResourceMana
// client
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
- return submitApp(masterMemory, name, user, null, false, null);
+ return submitApp(masterMemory, name, user, null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception {
- return submitApp(masterMemory, name, user, acls, false, null);
+ return submitApp(masterMemory, name, user, acls, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue) throws Exception {
- return submitApp(masterMemory, name, user, acls, false, queue);
- }
+ return submitApp(masterMemory, name, user, acls, false, queue,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
+ }
public RMApp submitApp(int masterMemory, String name, String user,
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue) throws Exception {
+ Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+ int maxAppAttempts) throws Exception {
ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -155,6 +162,7 @@ public class MockRM extends ResourceMana
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setUser(user);
+ sub.setMaxAppAttempts(maxAppAttempts);
if(unmanaged) {
sub.setUnmanagedAM(true);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Wed Apr 10 20:17:39 2013
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -254,6 +255,12 @@ public class TestClientRMService {
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+ when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
@@ -311,13 +318,54 @@ public class TestClientRMService {
endBarrier.await();
t.join();
}
+
+ @Test (timeout = 30000)
+ public void testInvalidResourceRequestWhenSubmittingApplication()
+ throws IOException, InterruptedException, BrokenBarrierException {
+ YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+ when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(yarnScheduler, rmContext);
+ RMStateStore stateStore = mock(RMStateStore.class);
+ when(rmContext.getStateStore()).thenReturn(stateStore);
+ RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+ null, mock(ApplicationACLsManager.class), new Configuration());
+
+ final ApplicationId appId = getApplicationId(100);
+ final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
+ when(submitRequest.getApplicationSubmissionContext().getAMContainerSpec()
+ .getResource()).thenReturn(resource);
+
+ final ClientRMService rmService =
+ new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
+
+ // submit an app
+ try {
+ rmService.submitApplication(submitRequest);
+ Assert.fail("Application submission should fail because resource" +
+ " request is invalid.");
+ } catch (YarnRemoteException e) {
+ // Exception is expected
+ Assert.assertTrue("The thrown exception is not" +
+ " InvalidResourceRequestException",
+ e.getMessage().startsWith("Invalid resource request"));
+ }
+ }
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
String user = MockApps.newUserName();
String queue = MockApps.newQueue();
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
- Resource resource = mock(Resource.class);
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
when(amContainerSpec.getResource()).thenReturn(resource);
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Apr 10 20:17:39 2013
@@ -198,6 +198,8 @@ public class TestFifoScheduler {
int allocMB = 1536;
YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, allocMB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ allocMB * 10);
// Test for something lesser than this.
testMinimumAllocation(conf, allocMB / 2);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Apr 10 20:17:39 2013
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -62,6 +64,7 @@ public class TestRMRestart {
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
@@ -152,7 +155,9 @@ public class TestRMRestart {
.getApplicationId());
// create unmanaged app
- RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null);
+ RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
+ null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
ApplicationAttemptId unmanagedAttemptId =
appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
// assert appUnmanaged info is saved
@@ -306,4 +311,74 @@ public class TestRMRestart {
Assert.assertEquals(0, rmAppState.size());
}
+ @Test
+ public void testRMRestartOnMaxAppAttempts() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ ExitUtil.disableSystemExit();
+
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE,
+ "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // submit an app with maxAppAttempts equals to 1
+ RMApp app1 = rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", 1);
+ // submit an app with maxAppAttempts equals to -1
+ RMApp app2 = rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", -1);
+
+ // assert app1 info is saved
+ ApplicationState appState = rmAppState.get(app1.getApplicationId());
+ Assert.assertNotNull(appState);
+ Assert.assertEquals(0, appState.getAttemptCount());
+ Assert.assertEquals(appState.getApplicationSubmissionContext()
+ .getApplicationId(), app1.getApplicationSubmissionContext()
+ .getApplicationId());
+
+ // Allocate the AM
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt = app1.getCurrentAppAttempt();
+ ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
+ rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+ Assert.assertEquals(1, appState.getAttemptCount());
+ ApplicationAttemptState attemptState =
+ appState.getAttempt(attemptId1);
+ Assert.assertNotNull(attemptState);
+ Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
+ attemptState.getMasterContainer().getId());
+ rm1.stop();
+
+ // start new RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // verify that maxAppAttempts is set to global value
+ Assert.assertEquals(2,
+ rm2.getRMContext().getRMApps().get(app2.getApplicationId())
+ .getMaxAppAttempts());
+
+ // verify that app2 exists app1 is removed
+ Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+ Assert.assertNotNull(rm2.getRMContext().getRMApps()
+ .get(app2.getApplicationId()));
+ Assert.assertNull(rm2.getRMContext().getRMApps()
+ .get(app1.getApplicationId()));
+
+ // stop the RM
+ rm2.stop();
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Wed Apr 10 20:17:39 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@@ -192,9 +193,41 @@ public class TestResourceManager {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
try {
resourceManager.init(conf);
- fail("Exception is expected because the global max attempts is negative.");
+ fail("Exception is expected because the global max attempts" +
+ " is negative.");
} catch (YarnException e) {
// Exception is expected.
+ assertTrue("The thrown exception is not the expected one.",
+ e.getMessage().startsWith(
+ "Invalid global max attempts configuration"));
+ }
+
+ conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
+ try {
+ resourceManager.init(conf);
+ fail("Exception is expected because the min memory allocation is" +
+ " larger than the max memory allocation.");
+ } catch (YarnException e) {
+ // Exception is expected.
+ assertTrue("The thrown exception is not the expected one.",
+ e.getMessage().startsWith(
+ "Invalid resource scheduler memory"));
+ }
+
+ conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
+ try {
+ resourceManager.init(conf);
+ fail("Exception is expected because the min vcores allocation is" +
+ " larger than the max vcores allocation.");
+ } catch (YarnException e) {
+ // Exception is expected.
+ assertTrue("The thrown exception is not the expected one.",
+ e.getMessage().startsWith(
+ "Invalid resource scheduler vcores"));
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Wed Apr 10 20:17:39 2013
@@ -19,54 +19,92 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestSchedulerUtils {
- @Test
+ @Test (timeout = 30000)
public void testNormalizeRequest() {
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
final int minMemory = 1024;
+ final int maxMemory = 8192;
Resource minResource = Resources.createResource(minMemory, 0);
+ Resource maxResource = Resources.createResource(maxMemory, 0);
ResourceRequest ask = new ResourceRequestPBImpl();
// case negative memory
ask.setCapability(Resources.createResource(-1024));
- SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
assertEquals(minMemory, ask.getCapability().getMemory());
// case zero memory
ask.setCapability(Resources.createResource(0));
- SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
assertEquals(minMemory, ask.getCapability().getMemory());
// case memory is a multiple of minMemory
ask.setCapability(Resources.createResource(2 * minMemory));
- SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
assertEquals(2 * minMemory, ask.getCapability().getMemory());
// case memory is not a multiple of minMemory
ask.setCapability(Resources.createResource(minMemory + 10));
- SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
assertEquals(2 * minMemory, ask.getCapability().getMemory());
+ // case memory is equal to max allowed
+ ask.setCapability(Resources.createResource(maxMemory));
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
+ assertEquals(maxMemory, ask.getCapability().getMemory());
+
+ // case memory is just less than max
+ ask.setCapability(Resources.createResource(maxMemory - 10));
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
+ assertEquals(maxMemory, ask.getCapability().getMemory());
+
+ // max is not a multiple of min
+ maxResource = Resources.createResource(maxMemory - 10, 0);
+ ask.setCapability(Resources.createResource(maxMemory - 100));
+ // multiple of minMemory > maxMemory, then reduce to maxMemory
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
+ assertEquals(maxResource.getMemory(), ask.getCapability().getMemory());
+
+ // ask is more than max
+ maxResource = Resources.createResource(maxMemory, 0);
+ ask.setCapability(Resources.createResource(maxMemory + 100));
+ SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
+ maxResource);
+ assertEquals(maxResource.getMemory(), ask.getCapability().getMemory());
}
- @Test
+ @Test (timeout = 30000)
public void testNormalizeRequestWithDominantResourceCalculator() {
ResourceCalculator resourceCalculator = new DominantResourceCalculator();
Resource minResource = Resources.createResource(1024, 1);
+ Resource maxResource = Resources.createResource(10240, 10);
Resource clusterResource = Resources.createResource(10 * 1024, 10);
ResourceRequest ask = new ResourceRequestPBImpl();
@@ -74,13 +112,13 @@ public class TestSchedulerUtils {
// case negative memory/vcores
ask.setCapability(Resources.createResource(-1024, -1));
SchedulerUtils.normalizeRequest(
- ask, resourceCalculator, clusterResource, minResource);
+ ask, resourceCalculator, clusterResource, minResource, maxResource);
assertEquals(minResource, ask.getCapability());
// case zero memory/vcores
ask.setCapability(Resources.createResource(0, 0));
SchedulerUtils.normalizeRequest(
- ask, resourceCalculator, clusterResource, minResource);
+ ask, resourceCalculator, clusterResource, minResource, maxResource);
assertEquals(minResource, ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores());
assertEquals(1024, ask.getCapability().getMemory());
@@ -88,9 +126,118 @@ public class TestSchedulerUtils {
// case non-zero memory & zero cores
ask.setCapability(Resources.createResource(1536, 0));
SchedulerUtils.normalizeRequest(
- ask, resourceCalculator, clusterResource, minResource);
+ ask, resourceCalculator, clusterResource, minResource, maxResource);
assertEquals(Resources.createResource(2048, 1), ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores());
assertEquals(2048, ask.getCapability().getMemory());
}
+
+ @Test (timeout = 30000)
+ public void testValidateResourceRequest() {
+ Resource maxResource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+ // zero memory
+ try {
+ Resource resource = Resources.createResource(
+ 0,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ } catch (InvalidResourceRequestException e) {
+ fail("Zero memory should be accepted");
+ }
+
+ // zero vcores
+ try {
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ 0);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ } catch (InvalidResourceRequestException e) {
+ fail("Zero vcores should be accepted");
+ }
+
+ // max memory
+ try {
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ } catch (InvalidResourceRequestException e) {
+ fail("Max memory should be accepted");
+ }
+
+ // max vcores
+ try {
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ } catch (InvalidResourceRequestException e) {
+ fail("Max vcores should not be accepted");
+ }
+
+ // negative memory
+ try {
+ Resource resource = Resources.createResource(
+ -1,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ fail("Negative memory should not be accepted");
+ } catch (InvalidResourceRequestException e) {
+ // expected
+ }
+
+ // negative vcores
+ try {
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ -1);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ fail("Negative vcores should not be accepted");
+ } catch (InvalidResourceRequestException e) {
+ // expected
+ }
+
+ // more than max memory
+ try {
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ fail("More than max memory should not be accepted");
+ } catch (InvalidResourceRequestException e) {
+ // expected
+ }
+
+ // more than max vcores
+ try {
+ Resource resource = Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ + 1);
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
+ SchedulerUtils.validateResourceRequest(resReq, maxResource);
+ fail("More than max vcores should not be accepted");
+ } catch (InvalidResourceRequestException e) {
+ // expected
+ }
+ }
+
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Wed Apr 10 20:17:39 2013
@@ -1623,6 +1623,30 @@ public class TestLeafQueue {
assertEquals(3, e.activeApplications.size());
assertEquals(0, e.pendingApplications.size());
}
+
+ @Test (timeout = 30000)
+ public void testNodeLocalityAfterQueueRefresh() throws Exception {
+
+ // Manipulate queue 'e'
+ LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
+
+ // before reinitialization
+ assertEquals(0, e.getNodeLocalityDelay());
+
+ csConf.setInt(CapacitySchedulerConfiguration
+ .NODE_LOCALITY_DELAY, 60);
+ Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+ CSQueue newRoot =
+ CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT,
+ newQueues, queues,
+ TestUtils.spyHook);
+ queues = newQueues;
+ root.reinitialize(newRoot, cs.getClusterResources());
+
+ // after reinitialization
+ assertEquals(60, e.getNodeLocalityDelay());
+ }
@Test (timeout = 30000)
public void testActivateApplicationByUpdatingClusterResource()
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Wed Apr 10 20:17:39 2013
@@ -110,6 +110,8 @@ public class TestFairScheduler {
public void setUp() throws IOException {
scheduler = new FairScheduler();
Configuration conf = createConfiguration();
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new ResourceManager();