You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 10:10:15 UTC

[17/50] [abbrv] hadoop git commit: MAPREDUCE-5002. AM could potentially allocate a reduce container to a map attempt. Contributed by Chang Li

MAPREDUCE-5002. AM could potentially allocate a reduce container to a map attempt. Contributed by Chang Li


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f82f582
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f82f582
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f82f582

Branch: refs/heads/HDFS-7966
Commit: 3f82f582e51c514cc0f052c828c2f58c6e8927ad
Parents: 58d1a02
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 17 18:17:29 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 17 18:17:29 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../v2/app/rm/RMContainerAllocator.java         |  10 +-
 .../v2/app/rm/TestRMContainerAllocator.java     | 110 +++++++++++++++++++
 3 files changed, 119 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f82f582/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index cde6d92..6cf7abb 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -570,6 +570,9 @@ Release 2.8.0 - UNRELEASED
     position/key information for uncompressed input sometimes. (Zhihai Xu via
     jlowe)
 
+    MAPREDUCE-5002. AM could potentially allocate a reduce container to a map
+    attempt (Chang Li via jlowe)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f82f582/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index ac4c586..78b0dc4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -1004,6 +1004,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       Iterator<Container> it = allocatedContainers.iterator();
       LOG.info("Got allocated containers " + allocatedContainers.size());
       containersAllocated += allocatedContainers.size();
+      int reducePending = reduces.size();
       while (it.hasNext()) {
         Container allocated = it.next();
         if (LOG.isDebugEnabled()) {
@@ -1034,13 +1035,14 @@ public class RMContainerAllocator extends RMContainerRequestor
         else if (PRIORITY_REDUCE.equals(priority)) {
           if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
               reduceResourceRequest, getSchedulerResourceTypes()) <= 0
-              || reduces.isEmpty()) {
-            LOG.info("Cannot assign container " + allocated 
+              || (reducePending <= 0)) {
+            LOG.info("Cannot assign container " + allocated
                 + " for a reduce as either "
                 + " container memory less than required " + reduceResourceRequest
-                + " or no pending reduce tasks - reduces.isEmpty=" 
-                + reduces.isEmpty()); 
+                + " or no pending reduce tasks.");
             isAssignable = false;
+          } else {
+            reducePending--;
           }
         } else {
           LOG.warn("Container allocated at unwanted priority: " + priority + 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f82f582/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e148c32..c98ccd3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@@ -562,6 +563,52 @@ public class TestRMContainerAllocator {
         assignedRequests.preemptionWaitingReduces.size());
   }
 
+  @Test(timeout = 30000)
+  public void testExcessReduceContainerAssign() throws Exception {
+  final Configuration conf = new Configuration();
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
+    final MyResourceManager2 rm = new MyResourceManager2(conf);
+    rm.start();
+    final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext()
+            .getDispatcher();
+    final RMApp app = rm.submitApp(2048);
+    dispatcher.await();
+    final String host = "host1";
+    final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096);
+    nm.nodeHeartbeat(true);
+    dispatcher.await();
+    final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+          .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+    final JobId jobId = MRBuilderUtils
+                 .newJobId(appAttemptId.getApplicationId(), 0);
+    final Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // request to allocate two reduce priority containers
+    final String[] locations = new String[] { host };
+    allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+    allocator.scheduleAllReduces();
+    allocator.makeRemoteRequest();
+    nm.nodeHeartbeat(true);
+    dispatcher.await();
+    allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
+
+    int assignedContainer;
+    for (assignedContainer = 0; assignedContainer < 1;) {
+      assignedContainer += allocator.schedule().size();
+      nm.nodeHeartbeat(true);
+      dispatcher.await();
+    }
+    // only 1 allocated container should be assigned
+    Assert.assertEquals(assignedContainer, 1);
+  }
+
   @Test
   public void testMapReduceAllocationWithNodeLabelExpression() throws Exception {
 
@@ -770,6 +817,17 @@ public class TestRMContainerAllocator {
     }
   }
 
+  private static class MyResourceManager2 extends MyResourceManager {
+    public MyResourceManager2(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    protected ResourceScheduler createScheduler() {
+      return new ExcessReduceContainerAllocateScheduler(this.getRMContext());
+    }
+  }
+
   @Test
   public void testReportedAppProgress() throws Exception {
 
@@ -1595,6 +1653,58 @@ public class TestRMContainerAllocator {
     }
   }
 
+  private static class ExcessReduceContainerAllocateScheduler extends FifoScheduler {
+
+    public ExcessReduceContainerAllocateScheduler(RMContext rmContext) {
+      super();
+      try {
+        Configuration conf = new Configuration();
+        reinitialize(conf, rmContext);
+      } catch (IOException ie) {
+        LOG.info("add application failed with ", ie);
+        assert (false);
+      }
+    }
+
+    @Override
+    public synchronized Allocation allocate(
+        ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+        List<ContainerId> release,
+        List<String> blacklistAdditions, List<String> blacklistRemovals) {
+      List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+      for (ResourceRequest req : ask) {
+        ResourceRequest reqCopy = ResourceRequest.newInstance(req
+            .getPriority(), req.getResourceName(), req.getCapability(), req
+            .getNumContainers(), req.getRelaxLocality());
+        askCopy.add(reqCopy);
+      }
+      SecurityUtil.setTokenServiceUseIp(false);
+      Allocation normalAlloc = super.allocate(
+          applicationAttemptId, askCopy, release,
+          blacklistAdditions, blacklistRemovals);
+      List<Container> containers = normalAlloc.getContainers();
+      if(containers.size() > 0) {
+        // allocate excess container
+        FiCaSchedulerApp application = super.getApplicationAttempt(applicationAttemptId);
+        ContainerId containerId = BuilderUtils.newContainerId(application
+            .getApplicationAttemptId(), application.getNewContainerId());
+        Container excessC = mock(Container.class);
+        when(excessC.getId()).thenReturn(containerId);
+        when(excessC.getPriority()).thenReturn(RMContainerAllocator.PRIORITY_REDUCE);
+        Resource mockR = mock(Resource.class);
+        when(mockR.getMemory()).thenReturn(2048);
+        when(excessC.getResource()).thenReturn(mockR);
+        NodeId nId = mock(NodeId.class);
+        when(nId.getHost()).thenReturn("local");
+        when(excessC.getNodeId()).thenReturn(nId);
+        containers.add(excessC);
+      }
+      Allocation excessAlloc = mock(Allocation.class);
+      when(excessAlloc.getContainers()).thenReturn(containers);
+      return excessAlloc;
+    }
+  }
+
   private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
       int memory, String[] hosts) {
     return createReq(jobId, taskAttemptId, memory, hosts, false, false);