You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2018/01/10 22:46:14 UTC

hadoop git commit: MAPREDUCE-6926. Allow MR jobs to opt out of oversubscription. Contributed by Haibo Chen.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-1011 b646d79d9 -> 43f51bcdb


MAPREDUCE-6926. Allow MR jobs to opt out of oversubscription. Contributed by Haibo Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43f51bcd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43f51bcd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43f51bcd

Branch: refs/heads/YARN-1011
Commit: 43f51bcdb92db94549c4e472068629f7971285be
Parents: b646d79
Author: Miklos Szegedi <sz...@apache.org>
Authored: Wed Jan 10 13:21:11 2018 -0800
Committer: Miklos Szegedi <sz...@apache.org>
Committed: Wed Jan 10 13:21:11 2018 -0800

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerRequestor.java         |  48 ++---
 .../v2/app/rm/TestRMContainerAllocator.java     | 192 +++++++++++++++++++
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   6 +
 .../src/main/resources/mapred-default.xml       |   8 +
 4 files changed, 231 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index bb3e1fa..d996690 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -111,6 +111,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
   private final Set<String> blacklistRemovals = Collections
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private boolean optOutOfOversubscription;
 
   public RMContainerRequestor(ClientService clientService, AppContext context) {
     super(clientService, context);
@@ -136,9 +137,11 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
         String nodeLabelExpression) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
-          event.getRacks(), priority, nodeLabelExpression);
+          event.getRacks(), priority, System.currentTimeMillis(),
+          nodeLabelExpression);
     }
 
+    @VisibleForTesting
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
                             long requestTimeMs) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
@@ -146,13 +149,6 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     }
 
     public ContainerRequest(TaskAttemptId attemptID,
-                            Resource capability, String[] hosts, String[] racks,
-                            Priority priority, String nodeLabelExpression) {
-      this(attemptID, capability, hosts, racks, priority,
-          System.currentTimeMillis(), nodeLabelExpression);
-    }
-
-    public ContainerRequest(TaskAttemptId attemptID,
         Resource capability, String[] hosts, String[] racks,
         Priority priority, long requestTimeMs,String nodeLabelExpression) {
       this.attemptID = attemptID;
@@ -186,6 +182,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
             MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
             MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
     LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+    optOutOfOversubscription = conf.getBoolean(
+        MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT,
+        MRJobConfig.DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT);
+    LOG.info("optOutOfOversubscription is " + optOutOfOversubscription);
     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
       throw new YarnRuntimeException("Invalid blacklistDisablePercent: "
           + blacklistDisablePercent
@@ -398,20 +398,20 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     for (String host : req.hosts) {
       // Data-local
       if (!isNodeBlacklisted(host)) {
-        addResourceRequest(req.priority, host, req.capability,
+        addGuaranteedResourceRequest(req.priority, host, req.capability,
             null);
       }
     }
 
     // Nothing Rack-local for now
     for (String rack : req.racks) {
-      addResourceRequest(req.priority, rack, req.capability,
+      addGuaranteedResourceRequest(req.priority, rack, req.capability,
           null);
     }
 
     // Off-switch
-    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
-        req.nodeLabelExpression);
+    addGuaranteedResourceRequest(req.priority, ResourceRequest.ANY,
+        req.capability, req.nodeLabelExpression);
   }
 
   protected void decContainerReq(ContainerRequest req) {
@@ -430,18 +430,18 @@ public abstract class RMContainerRequestor extends RMCommunicator {
   protected void addOpportunisticResourceRequest(Priority priority,
       Resource capability) {
     addResourceRequest(priority, ResourceRequest.ANY, capability, null,
-        ExecutionType.OPPORTUNISTIC);
+        ExecutionType.OPPORTUNISTIC, true);
   }
 
-  private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability, String nodeLabelExpression) {
+  private void addGuaranteedResourceRequest(Priority priority,
+      String resourceName, Resource capability, String nodeLabelExpression) {
     addResourceRequest(priority, resourceName, capability, nodeLabelExpression,
-        ExecutionType.GUARANTEED);
+        ExecutionType.GUARANTEED, optOutOfOversubscription);
   }
 
   private void addResourceRequest(Priority priority, String resourceName,
       Resource capability, String nodeLabelExpression,
-      ExecutionType executionType) {
+      ExecutionType executionType, boolean enforceExecutionType) {
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
@@ -464,8 +464,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       remoteRequest.setCapability(capability);
       remoteRequest.setNumContainers(0);
       remoteRequest.setNodeLabelExpression(nodeLabelExpression);
-      remoteRequest.setExecutionTypeRequest(
-          ExecutionTypeRequest.newInstance(executionType, true));
+      remoteRequest.setExecutionTypeRequest(ExecutionTypeRequest.
+          newInstance(executionType, enforceExecutionType));
       reqMap.put(capability, remoteRequest);
     }
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
@@ -473,9 +473,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     // Note this down for next interaction with ResourceManager
     addResourceRequestToAsk(remoteRequest);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("addResourceRequest:" + " applicationId="
+      LOG.debug("addGuaranteedResourceRequest:" + " applicationId="
           + applicationId.getId() + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
+          + " resourceName=" + resourceName + " ExecutionType=" + executionType
+          + " enforceExecutionType=" + enforceExecutionType + " numContainers="
           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
     }
   }
@@ -559,8 +560,9 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       }
     }
     String[] hosts = newHosts.toArray(new String[newHosts.size()]);
-    ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
-        hosts, orig.racks, orig.priority, orig.nodeLabelExpression);
+    ContainerRequest newReq = new ContainerRequest(orig.attemptID,
+        orig.capability, hosts, orig.racks, orig.priority,
+        System.currentTimeMillis(), orig.nodeLabelExpression);
     return newReq;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 85e4181..d95793d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -105,6 +105,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -844,6 +846,196 @@ public class TestRMContainerAllocator {
     allocator.close();
   }
 
+  /**
+   * Test A MapReduce job can be configured to opt out of oversubscription,
+   * that is, it always wait for guaranteed resources to execute its tasks.
+   * This is done by setting up a MapReduce job with 2 mappers and 1 reducers
+   * and capturing all ResourceRequests sent from the AM to RM, then checking
+   * if all ResourceRequests are guaranteed and their enforceExecutionType is
+   * true.
+   */
+  @Test
+  public void testMapReduceOptingOutOversubscription() throws Exception {
+    List<ResourceRequest> resourceRequests = captureResourceRequests(true);
+
+    for(ResourceRequest resourceRequest : resourceRequests) {
+      ExecutionTypeRequest executionTypeRequest =
+          resourceRequest.getExecutionTypeRequest();
+      if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance(
+          ExecutionType.GUARANTEED, true))) {
+        Assert.fail("The execution type of ResourceRequest " + resourceRequest +
+            " is not guaranteed or not enforced.");
+      }
+    }
+  }
+
+  /**
+   * Test a MapReduce job can be configured to opt in oversubscription (
+   * true by default). This is done by setting up a MapReduce job with 2
+   * mappers and 1 reducers and capturing all ResourceRequests sent from
+   * the AM to RM, then checking if all ResourceRequests are guaranteed
+   * but their enforceExecutionType is always set to false.
+   */
+  @Test
+  public void testMapReduceOptingInOversubscription() throws Exception {
+    List<ResourceRequest> resourceRequests = captureResourceRequests(false);
+
+    for(ResourceRequest resourceRequest : resourceRequests) {
+      ExecutionTypeRequest executionTypeRequest =
+          resourceRequest.getExecutionTypeRequest();
+      if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance(
+          ExecutionType.GUARANTEED, false))) {
+        Assert.fail("The execution type of ResourceRequest " + resourceRequest +
+            " is not guaranteed or it is enforced.");
+      }
+    }
+  }
+
+  /**
+   * Set up a mapreduce job with 2 mappers and 1 reducer and return
+   * all ResourceRequests sent from the AM to RM.
+   */
+  private List<ResourceRequest> captureResourceRequests(
+      boolean optOutOfOversubscription) throws Exception {
+    List<ResourceRequest> resourceRequests = new ArrayList<>();
+
+    final Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT,
+        optOutOfOversubscription);
+
+    // start the resource manager
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // submit an application
+    RMApp rmApp = rm.submitApp(1024);
+    rm.drainEvents();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
+    amNodeManager.nodeHeartbeat(true);
+    rm.drainEvents();
+
+    final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    rm.drainEvents();
+
+    // start the MR AM and wait until it is in running state
+    MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
+        appAttemptId, 0), 2, 1, false,
+        this.getClass().getName(), true, 1) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+      protected ContainerAllocator createContainerAllocator(
+          ClientService clientService, AppContext context) {
+        return new MyContainerAllocator(rm, appAttemptId, context);
+      };
+    };
+    mrApp.submit(conf);
+
+    Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
+        .getValue();
+    DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+    MyContainerAllocator allocator = (MyContainerAllocator) mrApp
+        .getContainerAllocator();
+    mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING);
+    amDispatcher.await();
+
+    // wait until all attempts request for containers
+    for (Task t : job.getTasks().values()) {
+      mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
+          .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
+    }
+    amDispatcher.await();
+
+    // send map resource requests to RM
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    // wait for both map tasks to be running
+    amNodeManager.nodeHeartbeat(true);
+    rm.drainEvents();
+
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    for (Task t : job.getTasks().values()) {
+      if (t.getType() == TaskType.MAP) {
+        mrApp.waitForState(t, TaskState.RUNNING);
+      }
+    }
+
+    // finish both map tasks so that the reduce task can be scheduled
+    Iterator<Task> it = job.getTasks().values().iterator();
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    // send the reduce resource requests to RM
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    // wait for the reduce task to be running
+    amNodeManager.nodeHeartbeat(true);
+    rm.drainEvents();
+
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    for (Task t : job.getTasks().values()) {
+      if (t.getType() == TaskType.REDUCE) {
+        mrApp.waitForState(t, TaskState.RUNNING);
+      }
+    }
+
+    // finish the reduce task
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    return resourceRequests;
+  }
+
   @Test
   public void testMapReduceScheduling() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 6acf1bc..569ef9d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -1165,6 +1165,12 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT = 0;
 
   /**
+   * Opt out of YARN oversubscription so that the job always waits for
+   * GUARANTEED resources available in the cluster.
+   */
+  String MR_OVERSUBSCRIPTION_OPT_OUT = "mapreduce.job.oversubscription-opt-out";
+  boolean DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT = false;
+  /**
    * A comma-separated list of properties whose value will be redacted.
    */
   String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1e432ce..00ee7d4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -2065,4 +2065,12 @@
   </description>
 </property>
 
+<property>
+  <description>
+    Opts out of YARN oversubscription so that the job always waits for GUARANTEED
+    resources available.
+  </description>
+  <name>mapreduce.job.oversubscription-opt-out</name>
+  <value>false</value>
+</property>
 </configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org