You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/24 19:07:53 UTC

[31/50] [abbrv] hadoop git commit: YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.

YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6fa51cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6fa51cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6fa51cb

Branch: refs/heads/YARN-1011
Commit: d6fa51cb1f0b37b7355a6cb773387c6a0b953450
Parents: dcb3709
Author: Miklos Szegedi <sz...@apache.org>
Authored: Wed Nov 22 09:03:05 2017 -0800
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 21 16:28:47 2018 -0700

----------------------------------------------------------------------
 .../scheduler/common/PendingAsk.java            | 15 +++-
 .../scheduler/fair/FSAppAttempt.java            |  5 ++
 .../LocalityAppPlacementAllocator.java          | 27 ++++++-
 .../scheduler/fair/FairSchedulerTestBase.java   | 40 ++++++++--
 .../scheduler/fair/TestFairScheduler.java       | 77 ++++++++++++++++++++
 5 files changed, 153 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
index 2ed3e83..470dbbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
@@ -30,16 +30,21 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class PendingAsk {
   private final Resource perAllocationResource;
   private final int count;
-  public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0);
+  public final static PendingAsk ZERO =
+      new PendingAsk(Resources.none(), 0, false);
+
+  private final boolean isGuaranteedTypeEnforced;
 
   public PendingAsk(ResourceSizing sizing) {
     this.perAllocationResource = sizing.getResources();
     this.count = sizing.getNumAllocations();
+    this.isGuaranteedTypeEnforced = true;
   }
 
-  public PendingAsk(Resource res, int num) {
+  public PendingAsk(Resource res, int num, boolean guaranteedTypeEnforced) {
     this.perAllocationResource = res;
     this.count = num;
+    this.isGuaranteedTypeEnforced = guaranteedTypeEnforced;
   }
 
   public Resource getPerAllocationResource() {
@@ -50,11 +55,17 @@ public class PendingAsk {
     return count;
   }
 
+  public boolean isGuaranteedTypeEnforced() {
+    return isGuaranteedTypeEnforced;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("<per-allocation-resource=");
     sb.append(getPerAllocationResource());
+    sb.append(", isGuaranteedEnforced=");
+    sb.append(isGuaranteedTypeEnforced());
     sb.append(",repeat=");
     sb.append(getCount());
     sb.append(">");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 1928591..0effd42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -859,6 +859,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
       boolean reserved, boolean opportunistic,
       SchedulerRequestKey schedulerKey) {
+    if (pendingAsk.isGuaranteedTypeEnforced() && opportunistic) {
+      // do not attempt to assign an OPPORTUNISTIC container to a resource
+      // request that has explicitly opted out of oversubscription
+      return Resources.none();
+    }
 
     // How much does this request need?
     Resource capability = pendingAsk.getPerAllocationResource();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index 4557350..68d53e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
@@ -188,13 +190,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
 
           PendingAsk lastPendingAsk =
               lastRequest == null ? null : new PendingAsk(
-                  lastRequest.getCapability(), lastRequest.getNumContainers());
+                  lastRequest.getCapability(), lastRequest.getNumContainers(),
+                  enforceGuaranteedExecutionType(lastRequest));
           String lastRequestedNodePartition =
               lastRequest == null ? null : lastRequest.getNodeLabelExpression();
 
           updateResult = new PendingAskUpdateResult(lastPendingAsk,
               new PendingAsk(request.getCapability(),
-                  request.getNumContainers()), lastRequestedNodePartition,
+                  request.getNumContainers(),
+                  enforceGuaranteedExecutionType(request)),
+                  lastRequestedNodePartition,
               request.getNodeLabelExpression());
         }
       }
@@ -234,8 +239,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
       if (null == request) {
         return PendingAsk.ZERO;
       } else{
+        boolean guaranteedEnforced = enforceGuaranteedExecutionType(request);
         return new PendingAsk(request.getCapability(),
-            request.getNumContainers());
+            request.getNumContainers(), guaranteedEnforced);
       }
     } finally {
       readLock.unlock();
@@ -243,6 +249,21 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
 
   }
 
+  /**
+   * Check for a given ResourceRequest, if its guaranteed execution type
+   * needs to be enforced.
+   * @param request resource request
+   * @return true if its guaranteed execution type is to be enforced.
+   *         false otherwise
+   */
+  private static boolean enforceGuaranteedExecutionType(
+      ResourceRequest request) {
+    ExecutionTypeRequest executionType = request.getExecutionTypeRequest();
+    return executionType != null &&
+        executionType.getExecutionType() == ExecutionType.GUARANTEED &&
+        executionType.getEnforceExecutionType();
+  }
+
   @Override
   public int getOutstandingAsksCount(String resourceName) {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 3ac3849..43a3931 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -117,9 +119,15 @@ public class FairSchedulerTestBase {
         relaxLocality);
   }
 
+  protected ResourceRequest createResourceRequest(int memory, int vcores,
+      String host, int priority, int numContainers, boolean relaxLocality) {
+    return createResourceRequest(memory, vcores, host, priority,
+        numContainers, relaxLocality, false);
+  }
+
   protected ResourceRequest createResourceRequest(
       int memory, int vcores, String host, int priority, int numContainers,
-      boolean relaxLocality) {
+      boolean relaxLocality, boolean guaranteedExecutionEnforced) {
     ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
     request.setCapability(BuilderUtils.newResource(memory, vcores));
     request.setResourceName(host);
@@ -129,6 +137,11 @@ public class FairSchedulerTestBase {
     request.setPriority(prio);
     request.setRelaxLocality(relaxLocality);
     request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    if (guaranteedExecutionEnforced) {
+      ExecutionTypeRequest executionType = ExecutionTypeRequest.newInstance(
+          ExecutionType.GUARANTEED, true);
+      request.setExecutionTypeRequest(executionType);
+    }
     return request;
   }
 
@@ -152,6 +165,13 @@ public class FairSchedulerTestBase {
   }
 
   protected ApplicationAttemptId createSchedulingRequest(
+      int memory, String queueId, String userId,
+      int numContainers, boolean guaranteedExecutionEnforced) {
+    return createSchedulingRequest(memory, 1, queueId,
+        userId, numContainers, 1, guaranteedExecutionEnforced);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
       int memory, int vcores, String queueId, String userId, int numContainers) {
     return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
   }
@@ -165,16 +185,24 @@ public class FairSchedulerTestBase {
   protected ApplicationAttemptId createSchedulingRequest(
       int memory, int vcores, String queueId, String userId, int numContainers,
       int priority) {
-    ResourceRequest request = createResourceRequest(memory, vcores,
-            ResourceRequest.ANY, priority, numContainers, true);
+    return createSchedulingRequest(memory, vcores, queueId,
+        userId, numContainers, priority, false);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, int vcores, String queueId, String userId, int numContainers,
+      int priority, boolean guaranteedExecutionEnforced) {
+    ResourceRequest request = createResourceRequest(
+        memory, vcores, ResourceRequest.ANY, priority,
+        numContainers, true, guaranteedExecutionEnforced);
     return createSchedulingRequest(Lists.newArrayList(request), queueId,
-            userId);
+        userId);
   }
 
   protected ApplicationAttemptId createSchedulingRequest(
       Collection<ResourceRequest> requests, String queueId, String userId) {
-    ApplicationAttemptId id =
-        createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
+        this.ATTEMPT_ID++);
     scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index ff620b8..fbb7243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2712,6 +2712,83 @@ public class TestFairScheduler extends FairSchedulerTestBase {
             getPriority().getPriority());
   }
 
+  @Test
+  public void testResourceRequestOptOutOfOversubscription() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that leaves some unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(3600, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container runs on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+      // create another scheduling request that opts out of oversubscription and
+      // asks for more than what's left unallocated on the node.
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(1536, "queue2", "user1", 1, true);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 0);
+
+      // verify that a reservation is made for the second request
+      assertTrue("A reservation should be made for the second request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer().
+              getReservedResource().equals(Resource.newInstance(1536, 1)));
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
   /**
    * Test that NO OPPORTUNISTIC containers can be allocated on a node that
    * is fully allocated and with a very high utilization.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org