You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2017/07/13 21:11:36 UTC
hadoop git commit: MAPREDUCE-6697. Concurrent task limits should only
be applied when necessary. Contributed by Nathan Roberts.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 7c8d4a1b3 -> c180005fc
MAPREDUCE-6697. Concurrent task limits should only be applied when necessary. Contributed by Nathan Roberts.
(cherry picked from commit a5c0476a990ec1e7eb34ce2462a45aa52cc1350d)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c180005f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c180005f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c180005f
Branch: refs/heads/branch-2.8
Commit: c180005fcf3d47771f5924c96ee7753b07fa6d44
Parents: 7c8d4a1
Author: Akira Ajisaka <aa...@apache.org>
Authored: Wed Jun 28 10:50:09 2017 +0900
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Thu Jul 13 13:30:33 2017 -0700
----------------------------------------------------------------------
.../v2/app/rm/RMContainerAllocator.java | 6 +-
.../v2/app/rm/TestRMContainerAllocator.java | 73 ++++++++++++++++++--
2 files changed, 73 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c180005f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 55fc7bc..2af670c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -842,7 +842,8 @@ public class RMContainerAllocator extends RMContainerRequestor
private void applyConcurrentTaskLimits() {
int numScheduledMaps = scheduledRequests.maps.size();
- if (maxRunningMaps > 0 && numScheduledMaps > 0) {
+ if (maxRunningMaps > 0 && numScheduledMaps > 0 &&
+ getJob().getTotalMaps() > maxRunningMaps) {
int maxRequestedMaps = Math.max(0,
maxRunningMaps - assignedRequests.maps.size());
int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
@@ -857,7 +858,8 @@ public class RMContainerAllocator extends RMContainerRequestor
}
int numScheduledReduces = scheduledRequests.reduces.size();
- if (maxRunningReduces > 0 && numScheduledReduces > 0) {
+ if (maxRunningReduces > 0 && numScheduledReduces > 0 &&
+ getJob().getTotalReduces() > maxRunningReduces) {
int maxRequestedReduces = Math.max(0,
maxRunningReduces - assignedRequests.reduces.size());
int reduceRequestLimit = Math.min(maxRequestedReduces,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c180005f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 4e11a5c..023dcd8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -2733,14 +2733,77 @@ public class TestRMContainerAllocator {
}
@Test
+ public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception {
+ final int MAP_COUNT = 1;
+ final int REDUCE_COUNT = 1;
+ final int MAP_LIMIT = 1;
+ final int REDUCE_LIMIT = 1;
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
+ conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
+ when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
+
+ final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+ MyContainerAllocator allocator =
+ new MyContainerAllocator(null, conf, appAttemptId, mockJob,
+ new SystemClock()) {
+ @Override
+ protected void register() {
+ }
+
+ @Override
+ protected ApplicationMasterProtocol createSchedulerProxy() {
+ return mockScheduler;
+ }
+
+ @Override
+ protected void setRequestLimit(Priority priority,
+ Resource capability, int limit) {
+ Assert.fail("setRequestLimit() should not be invoked");
+ }
+ };
+
+ // create some map requests
+ ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
+ for (int i = 0; i < reqMapEvents.length; ++i) {
+ reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+ }
+ allocator.sendRequests(Arrays.asList(reqMapEvents));
+ // create some reduce requests
+ ContainerRequestEvent[] reqReduceEvents =
+ new ContainerRequestEvent[REDUCE_COUNT];
+ for (int i = 0; i < reqReduceEvents.length; ++i) {
+ reqReduceEvents[i] =
+ createReq(jobId, i, 1024, new String[] {}, false, true);
+ }
+ allocator.sendRequests(Arrays.asList(reqReduceEvents));
+ allocator.schedule();
+ allocator.schedule();
+ allocator.schedule();
+ allocator.close();
+ }
+
+ @Test
public void testConcurrentTaskLimits() throws Exception {
+ final int MAP_COUNT = 5;
+ final int REDUCE_COUNT = 2;
final int MAP_LIMIT = 3;
final int REDUCE_LIMIT = 1;
LOG.info("Running testConcurrentTaskLimits");
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
- conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 1);
@@ -2749,6 +2812,9 @@ public class TestRMContainerAllocator {
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
+ when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
+
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
appAttemptId, mockJob, new SystemClock()) {
@@ -2763,14 +2829,13 @@ public class TestRMContainerAllocator {
};
// create some map requests
- ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
+ ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
for (int i = 0; i < reqMapEvents.length; ++i) {
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
}
allocator.sendRequests(Arrays.asList(reqMapEvents));
-
// create some reduce requests
- ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
+ ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT];
for (int i = 0; i < reqReduceEvents.length; ++i) {
reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
false, true);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org