You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 10:10:15 UTC
[17/50] [abbrv] hadoop git commit: MAPREDUCE-5002. AM could
potentially allocate a reduce container to a map attempt. Contributed by
Chang Li
MAPREDUCE-5002. AM could potentially allocate a reduce container to a map attempt. Contributed by Chang Li
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f82f582
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f82f582
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f82f582
Branch: refs/heads/HDFS-7966
Commit: 3f82f582e51c514cc0f052c828c2f58c6e8927ad
Parents: 58d1a02
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 17 18:17:29 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 17 18:17:29 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/rm/RMContainerAllocator.java | 10 +-
.../v2/app/rm/TestRMContainerAllocator.java | 110 +++++++++++++++++++
3 files changed, 119 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f82f582/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index cde6d92..6cf7abb 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -570,6 +570,9 @@ Release 2.8.0 - UNRELEASED
position/key information for uncompressed input sometimes. (Zhihai Xu via
jlowe)
+ MAPREDUCE-5002. AM could potentially allocate a reduce container to a map
+ attempt (Chang Li via jlowe)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f82f582/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index ac4c586..78b0dc4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -1004,6 +1004,7 @@ public class RMContainerAllocator extends RMContainerRequestor
Iterator<Container> it = allocatedContainers.iterator();
LOG.info("Got allocated containers " + allocatedContainers.size());
containersAllocated += allocatedContainers.size();
+ int reducePending = reduces.size();
while (it.hasNext()) {
Container allocated = it.next();
if (LOG.isDebugEnabled()) {
@@ -1034,13 +1035,14 @@ public class RMContainerAllocator extends RMContainerRequestor
else if (PRIORITY_REDUCE.equals(priority)) {
if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
reduceResourceRequest, getSchedulerResourceTypes()) <= 0
- || reduces.isEmpty()) {
- LOG.info("Cannot assign container " + allocated
+ || (reducePending <= 0)) {
+ LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
+ " container memory less than required " + reduceResourceRequest
- + " or no pending reduce tasks - reduces.isEmpty="
- + reduces.isEmpty());
+ + " or no pending reduce tasks.");
isAssignable = false;
+ } else {
+ reducePending--;
}
} else {
LOG.warn("Container allocated at unwanted priority: " + priority +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f82f582/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e148c32..c98ccd3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@@ -562,6 +563,52 @@ public class TestRMContainerAllocator {
assignedRequests.preemptionWaitingReduces.size());
}
+ @Test(timeout = 30000)
+ public void testExcessReduceContainerAssign() throws Exception {
+ final Configuration conf = new Configuration();
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
+ final MyResourceManager2 rm = new MyResourceManager2(conf);
+ rm.start();
+ final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext()
+ .getDispatcher();
+ final RMApp app = rm.submitApp(2048);
+ dispatcher.await();
+ final String host = "host1";
+ final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096);
+ nm.nodeHeartbeat(true);
+ dispatcher.await();
+ final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+ final JobId jobId = MRBuilderUtils
+ .newJobId(appAttemptId.getApplicationId(), 0);
+ final Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // request to allocate two reduce priority containers
+ final String[] locations = new String[] { host };
+ allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+ allocator.scheduleAllReduces();
+ allocator.makeRemoteRequest();
+ nm.nodeHeartbeat(true);
+ dispatcher.await();
+ allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
+
+ int assignedContainer;
+ for (assignedContainer = 0; assignedContainer < 1;) {
+ assignedContainer += allocator.schedule().size();
+ nm.nodeHeartbeat(true);
+ dispatcher.await();
+ }
+ // only 1 allocated container should be assigned
+ Assert.assertEquals(assignedContainer, 1);
+ }
+
@Test
public void testMapReduceAllocationWithNodeLabelExpression() throws Exception {
@@ -770,6 +817,17 @@ public class TestRMContainerAllocator {
}
}
+ private static class MyResourceManager2 extends MyResourceManager {
+ public MyResourceManager2(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected ResourceScheduler createScheduler() {
+ return new ExcessReduceContainerAllocateScheduler(this.getRMContext());
+ }
+ }
+
@Test
public void testReportedAppProgress() throws Exception {
@@ -1595,6 +1653,58 @@ public class TestRMContainerAllocator {
}
}
+ private static class ExcessReduceContainerAllocateScheduler extends FifoScheduler {
+
+ public ExcessReduceContainerAllocateScheduler(RMContext rmContext) {
+ super();
+ try {
+ Configuration conf = new Configuration();
+ reinitialize(conf, rmContext);
+ } catch (IOException ie) {
+ LOG.info("add application failed with ", ie);
+ assert (false);
+ }
+ }
+
+ @Override
+ public synchronized Allocation allocate(
+ ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+ List<ContainerId> release,
+ List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+ for (ResourceRequest req : ask) {
+ ResourceRequest reqCopy = ResourceRequest.newInstance(req
+ .getPriority(), req.getResourceName(), req.getCapability(), req
+ .getNumContainers(), req.getRelaxLocality());
+ askCopy.add(reqCopy);
+ }
+ SecurityUtil.setTokenServiceUseIp(false);
+ Allocation normalAlloc = super.allocate(
+ applicationAttemptId, askCopy, release,
+ blacklistAdditions, blacklistRemovals);
+ List<Container> containers = normalAlloc.getContainers();
+ if(containers.size() > 0) {
+ // allocate excess container
+ FiCaSchedulerApp application = super.getApplicationAttempt(applicationAttemptId);
+ ContainerId containerId = BuilderUtils.newContainerId(application
+ .getApplicationAttemptId(), application.getNewContainerId());
+ Container excessC = mock(Container.class);
+ when(excessC.getId()).thenReturn(containerId);
+ when(excessC.getPriority()).thenReturn(RMContainerAllocator.PRIORITY_REDUCE);
+ Resource mockR = mock(Resource.class);
+ when(mockR.getMemory()).thenReturn(2048);
+ when(excessC.getResource()).thenReturn(mockR);
+ NodeId nId = mock(NodeId.class);
+ when(nId.getHost()).thenReturn("local");
+ when(excessC.getNodeId()).thenReturn(nId);
+ containers.add(excessC);
+ }
+ Allocation excessAlloc = mock(Allocation.class);
+ when(excessAlloc.getContainers()).thenReturn(containers);
+ return excessAlloc;
+ }
+ }
+
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int memory, String[] hosts) {
return createReq(jobId, taskAttemptId, memory, hosts, false, false);