You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/01/03 21:32:20 UTC

[48/50] [abbrv] hadoop git commit: YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.

YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05b729c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05b729c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05b729c2

Branch: refs/heads/YARN-1011
Commit: 05b729c2c56b291f3c8bdfc7d6c7d3b502f219f5
Parents: 0af7353
Author: Miklos Szegedi <mi...@cloudera.com>
Authored: Wed Nov 22 09:03:05 2017 -0800
Committer: Haibo Chen <ha...@apache.org>
Committed: Wed Jan 3 12:30:03 2018 -0800

----------------------------------------------------------------------
 .../scheduler/common/PendingAsk.java            | 14 +++-
 .../scheduler/fair/FSAppAttempt.java            |  5 ++
 .../LocalityAppPlacementAllocator.java          | 27 ++++++-
 .../scheduler/fair/FairSchedulerTestBase.java   | 34 ++++++++-
 .../scheduler/fair/TestFairScheduler.java       | 77 ++++++++++++++++++++
 5 files changed, 149 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
index 85d8715..e9931a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
@@ -29,11 +29,15 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class PendingAsk {
   private final Resource perAllocationResource;
   private final int count;
-  public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0);
+  private final boolean isGuaranteedTypeEnforced;
 
-  public PendingAsk(Resource res, int num) {
+  public final static PendingAsk ZERO =
+      new PendingAsk(Resources.none(), 0, false);
+
+  public PendingAsk(Resource res, int num, boolean guaranteedTypeEnforced) {
     this.perAllocationResource = res;
     this.count = num;
+    this.isGuaranteedTypeEnforced = guaranteedTypeEnforced;
   }
 
   public Resource getPerAllocationResource() {
@@ -44,11 +48,17 @@ public class PendingAsk {
     return count;
   }
 
+  public boolean isGuaranteedTypeEnforced() {
+    return isGuaranteedTypeEnforced;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("<per-allocation-resource=");
     sb.append(getPerAllocationResource());
+    sb.append(", isGuaranteedEnforced=");
+    sb.append(isGuaranteedTypeEnforced());
     sb.append(",repeat=");
     sb.append(getCount());
     sb.append(">");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 2bdac0a..8a89f78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -860,6 +860,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
       boolean reserved, boolean opportunistic,
       SchedulerRequestKey schedulerKey) {
+    if (pendingAsk.isGuaranteedTypeEnforced() && opportunistic) {
+      // do not attempt to assign an OPPORTUNISTIC container to a resource
+      // request that has explicitly opted out of oversubscription
+      return Resources.none();
+    }
 
     // How much does this request need?
     Resource capability = pendingAsk.getPerAllocationResource();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index 766827c..88a6acb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -159,13 +161,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
 
           PendingAsk lastPendingAsk =
               lastRequest == null ? null : new PendingAsk(
-                  lastRequest.getCapability(), lastRequest.getNumContainers());
+                  lastRequest.getCapability(), lastRequest.getNumContainers(),
+                  enforceGuaranteedExecutionType(lastRequest));
           String lastRequestedNodePartition =
               lastRequest == null ? null : lastRequest.getNodeLabelExpression();
 
           updateResult = new PendingAskUpdateResult(lastPendingAsk,
               new PendingAsk(request.getCapability(),
-                  request.getNumContainers()), lastRequestedNodePartition,
+                  request.getNumContainers(),
+                  enforceGuaranteedExecutionType(request)),
+                  lastRequestedNodePartition,
               request.getNodeLabelExpression());
         }
       }
@@ -192,8 +197,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
       if (null == request) {
         return PendingAsk.ZERO;
       } else{
+        boolean guaranteedEnforced = enforceGuaranteedExecutionType(request);
         return new PendingAsk(request.getCapability(),
-            request.getNumContainers());
+            request.getNumContainers(), guaranteedEnforced);
       }
     } finally {
       readLock.unlock();
@@ -201,6 +207,21 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
 
   }
 
+  /**
+   * Check for a given ResourceRequest, if its guaranteed execution type
+   * needs to be enforced.
+   * @param request resource request
+   * @return true if its guaranteed execution type is to be enforced.
+   *         false otherwise
+   */
+  private static boolean enforceGuaranteedExecutionType(
+      ResourceRequest request) {
+    ExecutionTypeRequest executionType = request.getExecutionTypeRequest();
+    return executionType != null &&
+        executionType.getExecutionType() == ExecutionType.GUARANTEED &&
+        executionType.getEnforceExecutionType();
+  }
+
   @Override
   public int getOutstandingAsksCount(String resourceName) {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index af4e1dd..6d9df4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -115,9 +117,15 @@ public class FairSchedulerTestBase {
         relaxLocality);
   }
 
+  protected ResourceRequest createResourceRequest(int memory, int vcores,
+      String host, int priority, int numContainers, boolean relaxLocality) {
+    return createResourceRequest(memory, vcores, host, priority,
+        numContainers, relaxLocality, false);
+  }
+
   protected ResourceRequest createResourceRequest(
       int memory, int vcores, String host, int priority, int numContainers,
-      boolean relaxLocality) {
+      boolean relaxLocality, boolean guaranteedExecutionEnforced) {
     ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
     request.setCapability(BuilderUtils.newResource(memory, vcores));
     request.setResourceName(host);
@@ -127,6 +135,11 @@ public class FairSchedulerTestBase {
     request.setPriority(prio);
     request.setRelaxLocality(relaxLocality);
     request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    if (guaranteedExecutionEnforced) {
+      ExecutionTypeRequest executionType = ExecutionTypeRequest.newInstance(
+          ExecutionType.GUARANTEED, true);
+      request.setExecutionTypeRequest(executionType);
+    }
     return request;
   }
 
@@ -150,6 +163,13 @@ public class FairSchedulerTestBase {
   }
 
   protected ApplicationAttemptId createSchedulingRequest(
+      int memory, String queueId, String userId,
+      int numContainers, boolean guaranteedExecutionEnforced) {
+    return createSchedulingRequest(memory, 1, queueId,
+        userId, numContainers, 1, guaranteedExecutionEnforced);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
       int memory, int vcores, String queueId, String userId, int numContainers) {
     return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
   }
@@ -163,6 +183,13 @@ public class FairSchedulerTestBase {
   protected ApplicationAttemptId createSchedulingRequest(
       int memory, int vcores, String queueId, String userId, int numContainers,
       int priority) {
+    return createSchedulingRequest(memory, vcores, queueId, userId,
+        numContainers, priority, false);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, int vcores, String queueId, String userId, int numContainers,
+      int priority, boolean guaranteedExecutionEnforced) {
     ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
     // This conditional is for testAclSubmitApplication where app is rejected
@@ -171,8 +198,9 @@ public class FairSchedulerTestBase {
       scheduler.addApplicationAttempt(id, false, false);
     }
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
-        priority, numContainers, true);
+    ResourceRequest request = createResourceRequest(memory, vcores,
+        ResourceRequest.ANY, priority, numContainers, true,
+        guaranteedExecutionEnforced);
     ask.add(request);
 
     RMApp rmApp = mock(RMApp.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index e70053c..d533617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2704,6 +2704,83 @@ public class TestFairScheduler extends FairSchedulerTestBase {
             getPriority().getPriority());
   }
 
+  @Test
+  public void testResourceRequestOptOutOfOversubscription() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that leaves some unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(3600, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container runs on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+      // create another scheduling request that opts out of oversubscription and
+      // asks for more than what's left unallocated on the node.
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(1536, "queue2", "user1", 1, true);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 0);
+
+      // verify that a reservation is made for the second request
+      assertTrue("A reservation should be made for the second request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer().
+              getReservedResource().equals(Resource.newInstance(1536, 1)));
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
   /**
    * Test that NO OPPORTUNISTIC containers can be allocated on a node that
    * is fully allocated and with a very high utilization.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org