You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/24 19:07:53 UTC
[31/50] [abbrv] hadoop git commit: YARN-6921. Allow resource request
to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.
YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6fa51cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6fa51cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6fa51cb
Branch: refs/heads/YARN-1011
Commit: d6fa51cb1f0b37b7355a6cb773387c6a0b953450
Parents: dcb3709
Author: Miklos Szegedi <sz...@apache.org>
Authored: Wed Nov 22 09:03:05 2017 -0800
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 21 16:28:47 2018 -0700
----------------------------------------------------------------------
.../scheduler/common/PendingAsk.java | 15 +++-
.../scheduler/fair/FSAppAttempt.java | 5 ++
.../LocalityAppPlacementAllocator.java | 27 ++++++-
.../scheduler/fair/FairSchedulerTestBase.java | 40 ++++++++--
.../scheduler/fair/TestFairScheduler.java | 77 ++++++++++++++++++++
5 files changed, 153 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
index 2ed3e83..470dbbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
@@ -30,16 +30,21 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class PendingAsk {
private final Resource perAllocationResource;
private final int count;
- public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0);
+ public final static PendingAsk ZERO =
+ new PendingAsk(Resources.none(), 0, false);
+
+ private final boolean isGuaranteedTypeEnforced;
public PendingAsk(ResourceSizing sizing) {
this.perAllocationResource = sizing.getResources();
this.count = sizing.getNumAllocations();
+ this.isGuaranteedTypeEnforced = true;
}
- public PendingAsk(Resource res, int num) {
+ public PendingAsk(Resource res, int num, boolean guaranteedTypeEnforced) {
this.perAllocationResource = res;
this.count = num;
+ this.isGuaranteedTypeEnforced = guaranteedTypeEnforced;
}
public Resource getPerAllocationResource() {
@@ -50,11 +55,17 @@ public class PendingAsk {
return count;
}
+ public boolean isGuaranteedTypeEnforced() {
+ return isGuaranteedTypeEnforced;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("<per-allocation-resource=");
sb.append(getPerAllocationResource());
+ sb.append(", isGuaranteedEnforced=");
+ sb.append(isGuaranteedTypeEnforced());
sb.append(",repeat=");
sb.append(getCount());
sb.append(">");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 1928591..0effd42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -859,6 +859,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
boolean reserved, boolean opportunistic,
SchedulerRequestKey schedulerKey) {
+ if (pendingAsk.isGuaranteedTypeEnforced() && opportunistic) {
+ // do not attempt to assign an OPPORTUNISTIC container to a resource
+ // request that has explicitly opted out of oversubscription
+ return Resources.none();
+ }
// How much does this request need?
Resource capability = pendingAsk.getPerAllocationResource();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index 4557350..68d53e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
@@ -188,13 +190,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
PendingAsk lastPendingAsk =
lastRequest == null ? null : new PendingAsk(
- lastRequest.getCapability(), lastRequest.getNumContainers());
+ lastRequest.getCapability(), lastRequest.getNumContainers(),
+ enforceGuaranteedExecutionType(lastRequest));
String lastRequestedNodePartition =
lastRequest == null ? null : lastRequest.getNodeLabelExpression();
updateResult = new PendingAskUpdateResult(lastPendingAsk,
new PendingAsk(request.getCapability(),
- request.getNumContainers()), lastRequestedNodePartition,
+ request.getNumContainers(),
+ enforceGuaranteedExecutionType(request)),
+ lastRequestedNodePartition,
request.getNodeLabelExpression());
}
}
@@ -234,8 +239,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
if (null == request) {
return PendingAsk.ZERO;
} else{
+ boolean guaranteedEnforced = enforceGuaranteedExecutionType(request);
return new PendingAsk(request.getCapability(),
- request.getNumContainers());
+ request.getNumContainers(), guaranteedEnforced);
}
} finally {
readLock.unlock();
@@ -243,6 +249,21 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
}
+ /**
+ * Check for a given ResourceRequest, if its guaranteed execution type
+ * needs to be enforced.
+ * @param request resource request
+ * @return true if its guaranteed execution type is to be enforced.
+ * false otherwise
+ */
+ private static boolean enforceGuaranteedExecutionType(
+ ResourceRequest request) {
+ ExecutionTypeRequest executionType = request.getExecutionTypeRequest();
+ return executionType != null &&
+ executionType.getExecutionType() == ExecutionType.GUARANTEED &&
+ executionType.getEnforceExecutionType();
+ }
+
@Override
public int getOutstandingAsksCount(String resourceName) {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 3ac3849..43a3931 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -117,9 +119,15 @@ public class FairSchedulerTestBase {
relaxLocality);
}
+ protected ResourceRequest createResourceRequest(int memory, int vcores,
+ String host, int priority, int numContainers, boolean relaxLocality) {
+ return createResourceRequest(memory, vcores, host, priority,
+ numContainers, relaxLocality, false);
+ }
+
protected ResourceRequest createResourceRequest(
int memory, int vcores, String host, int priority, int numContainers,
- boolean relaxLocality) {
+ boolean relaxLocality, boolean guaranteedExecutionEnforced) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
request.setCapability(BuilderUtils.newResource(memory, vcores));
request.setResourceName(host);
@@ -129,6 +137,11 @@ public class FairSchedulerTestBase {
request.setPriority(prio);
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+ if (guaranteedExecutionEnforced) {
+ ExecutionTypeRequest executionType = ExecutionTypeRequest.newInstance(
+ ExecutionType.GUARANTEED, true);
+ request.setExecutionTypeRequest(executionType);
+ }
return request;
}
@@ -152,6 +165,13 @@ public class FairSchedulerTestBase {
}
protected ApplicationAttemptId createSchedulingRequest(
+ int memory, String queueId, String userId,
+ int numContainers, boolean guaranteedExecutionEnforced) {
+ return createSchedulingRequest(memory, 1, queueId,
+ userId, numContainers, 1, guaranteedExecutionEnforced);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
int memory, int vcores, String queueId, String userId, int numContainers) {
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
}
@@ -165,16 +185,24 @@ public class FairSchedulerTestBase {
protected ApplicationAttemptId createSchedulingRequest(
int memory, int vcores, String queueId, String userId, int numContainers,
int priority) {
- ResourceRequest request = createResourceRequest(memory, vcores,
- ResourceRequest.ANY, priority, numContainers, true);
+ return createSchedulingRequest(memory, vcores, queueId,
+ userId, numContainers, priority, false);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, int vcores, String queueId, String userId, int numContainers,
+ int priority, boolean guaranteedExecutionEnforced) {
+ ResourceRequest request = createResourceRequest(
+ memory, vcores, ResourceRequest.ANY, priority,
+ numContainers, true, guaranteedExecutionEnforced);
return createSchedulingRequest(Lists.newArrayList(request), queueId,
- userId);
+ userId);
}
protected ApplicationAttemptId createSchedulingRequest(
Collection<ResourceRequest> requests, String queueId, String userId) {
- ApplicationAttemptId id =
- createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+ ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
+ this.ATTEMPT_ID++);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6fa51cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index ff620b8..fbb7243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2712,6 +2712,83 @@ public class TestFairScheduler extends FairSchedulerTestBase {
getPriority().getPriority());
}
+ @Test
+ public void testResourceRequestOptOutOfOversubscription() throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that leaves some unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(3600, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container runs on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+ // create another scheduling request that opts out of oversubscription and
+ // asks for more than what's left unallocated on the node.
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(1536, "queue2", "user1", 1, true);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 0);
+
+ // verify that a reservation is made for the second request
+ assertTrue("A reservation should be made for the second request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer().
+ getReservedResource().equals(Resource.newInstance(1536, 1)));
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
/**
* Test that NO OPPORTUNISTIC containers can be allocated on a node that
* is fully allocated and with a very high utilization.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org